|
|
|
|
|
"""
|
|
|
mikufans录播姬弹幕XML文件读取器
|
|
|
|
|
|
支持解析mikufans录播姬生成的弹幕XML文件,包括:
|
|
|
- 弹幕数据
|
|
|
- 礼物数据
|
|
|
- SuperChat数据
|
|
|
- 舰长购买数据
|
|
|
- 录制信息
|
|
|
|
|
|
使用示例:
|
|
|
# 从文件读取
|
|
|
with DanmakuReader('danmaku.xml') as reader:
|
|
|
danmaku_list = reader.get_danmaku()
|
|
|
gift_list = reader.get_gifts()
|
|
|
|
|
|
# 从字符串读取
|
|
|
reader = DanmakuReader(xml_content.encode('utf-8'))
|
|
|
print(f"弹幕数量: {reader.get_danmaku_count()}")
|
|
|
"""
|
|
|
|
|
|
import io
|
|
|
import xml.etree.ElementTree as ET
|
|
|
from typing import TextIO, List, Optional
|
|
|
from dataclasses import dataclass
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class Danmaku:
|
|
|
"""弹幕数据类"""
|
|
|
time: float # 弹幕出现时间(秒)
|
|
|
type: int # 弹幕类型(1-滚动,5-顶部,4-底部)
|
|
|
fontsize: int # 字号
|
|
|
color: int # 颜色
|
|
|
timestamp: int # 时间戳
|
|
|
pool: int # 弹幕池
|
|
|
user_id: int # 用户ID
|
|
|
user_hash: int # 用户哈希
|
|
|
content: str # 弹幕内容
|
|
|
user: str # 用户名
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class Gift:
|
|
|
"""礼物数据类"""
|
|
|
timestamp: float # 时间戳
|
|
|
user: str # 用户名
|
|
|
uid: int # 用户ID
|
|
|
giftname: str # 礼物名称
|
|
|
giftcount: int # 礼物数量
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class SuperChat:
|
|
|
"""SuperChat数据类"""
|
|
|
user: str # 用户名
|
|
|
uid: int # 用户ID
|
|
|
content: str # SC内容
|
|
|
time: int # 显示时长
|
|
|
price: float # 价格
|
|
|
timestamp: float # 时间戳
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class Guard:
|
|
|
"""上船数据类"""
|
|
|
user: str # 用户名
|
|
|
uid: int # 用户ID
|
|
|
level: int # 舰长等级
|
|
|
count: int # 购买数量
|
|
|
timestamp: float # 时间戳
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class RecordInfo:
|
|
|
"""录制信息数据类"""
|
|
|
roomid: int # 房间号
|
|
|
shortid: int # 短房间号
|
|
|
name: str # 主播名
|
|
|
title: str # 直播标题
|
|
|
areanameparent: str # 父分区
|
|
|
areanamechild: str # 子分区
|
|
|
start_time: datetime # 开始时间
|
|
|
|
|
|
|
|
|
class DanmakuReader:
|
|
|
def __init__(self, file: str | TextIO | bytes):
|
|
|
if isinstance(file, str):
|
|
|
self.file = open(file, 'r', encoding='utf-8')
|
|
|
elif isinstance(file, bytes):
|
|
|
self.file = io.StringIO(file.decode('utf-8'))
|
|
|
else:
|
|
|
self.file = file
|
|
|
|
|
|
# 存储解析后的数据
|
|
|
self.danmaku_list: List[Danmaku] = []
|
|
|
self.gift_list: List[Gift] = []
|
|
|
self.superchat_list: List[SuperChat] = []
|
|
|
self.guard_list: List[Guard] = []
|
|
|
self.record_info: Optional[RecordInfo] = None
|
|
|
|
|
|
# 自动解析XML
|
|
|
self._parse_xml()
|
|
|
|
|
|
def _parse_xml(self):
|
|
|
"""解析XML文件"""
|
|
|
try:
|
|
|
content = self.file.read()
|
|
|
# 重置文件指针
|
|
|
if hasattr(self.file, 'seek'):
|
|
|
self.file.seek(0)
|
|
|
|
|
|
# 解析XML
|
|
|
root = ET.fromstring(content)
|
|
|
|
|
|
# 解析录制信息
|
|
|
self._parse_record_info(root)
|
|
|
|
|
|
# 解析各种数据
|
|
|
self._parse_danmaku(root)
|
|
|
self._parse_gifts(root)
|
|
|
self._parse_superchat(root)
|
|
|
self._parse_guard(root)
|
|
|
|
|
|
except ET.ParseError as e:
|
|
|
raise ValueError(f"XML解析错误: {e}")
|
|
|
except Exception as e:
|
|
|
raise ValueError(f"文件读取错误: {e}")
|
|
|
|
|
|
def _parse_iso_time(self, time_str: str) -> datetime:
|
|
|
# 只保留小数点后6位
|
|
|
if '.' in time_str:
|
|
|
date_part, rest = time_str.split('.', 1)
|
|
|
microsecond = rest[:6]
|
|
|
tz = rest[7:] # 跳过6位微秒和1位时区分隔符
|
|
|
s_fixed = f"{date_part}.{microsecond}{tz}"
|
|
|
else:
|
|
|
s_fixed = time_str
|
|
|
|
|
|
return datetime.fromisoformat(s_fixed)
|
|
|
|
|
|
def _parse_record_info(self, root: ET.Element):
|
|
|
"""解析录制信息"""
|
|
|
record_element = root.find('BililiveRecorderRecordInfo')
|
|
|
if record_element is not None:
|
|
|
start_time_str = record_element.get('start_time', '')
|
|
|
start_time = None
|
|
|
if start_time_str:
|
|
|
try:
|
|
|
# 尝试解析为标准时间格式
|
|
|
start_time = self._parse_iso_time(start_time_str)
|
|
|
except ValueError:
|
|
|
pass
|
|
|
|
|
|
self.record_info = RecordInfo(
|
|
|
roomid=int(record_element.get('roomid', 0)),
|
|
|
shortid=int(record_element.get('shortid', 0)),
|
|
|
name=record_element.get('name', ''),
|
|
|
title=record_element.get('title', ''),
|
|
|
areanameparent=record_element.get('areanameparent', ''),
|
|
|
areanamechild=record_element.get('areanamechild', ''),
|
|
|
start_time=start_time
|
|
|
)
|
|
|
|
|
|
def _parse_danmaku(self, root: ET.Element):
|
|
|
"""解析弹幕数据"""
|
|
|
for d_element in root.findall('d'):
|
|
|
try:
|
|
|
# 解析p属性,格式:时间,类型,字号,颜色,时间戳,池,用户ID,用户哈希
|
|
|
p_attr = d_element.get('p', '')
|
|
|
if not p_attr:
|
|
|
continue
|
|
|
|
|
|
p_parts = p_attr.split(',')
|
|
|
if len(p_parts) < 8:
|
|
|
continue
|
|
|
|
|
|
danmaku = Danmaku(
|
|
|
time=float(p_parts[0]),
|
|
|
type=int(p_parts[1]),
|
|
|
fontsize=int(p_parts[2]),
|
|
|
color=int(p_parts[3]),
|
|
|
timestamp=int(p_parts[4]),
|
|
|
pool=int(p_parts[5]),
|
|
|
user_id=int(p_parts[6]),
|
|
|
user_hash=int(p_parts[7]),
|
|
|
content=d_element.text or '',
|
|
|
user=d_element.get('user', '')
|
|
|
)
|
|
|
self.danmaku_list.append(danmaku)
|
|
|
except (ValueError, IndexError) as e:
|
|
|
# 跳过解析错误的弹幕
|
|
|
continue
|
|
|
|
|
|
def _parse_gifts(self, root: ET.Element):
|
|
|
"""解析礼物数据"""
|
|
|
for gift_element in root.findall('gift'):
|
|
|
try:
|
|
|
gift = Gift(
|
|
|
timestamp=float(gift_element.get('ts', None) or 0),
|
|
|
user=gift_element.get('user', ''),
|
|
|
uid=int(gift_element.get('uid', 0)),
|
|
|
giftname=gift_element.get('giftname', ''),
|
|
|
giftcount=int(gift_element.get('giftcount', 0))
|
|
|
)
|
|
|
self.gift_list.append(gift)
|
|
|
except (ValueError, TypeError) as e:
|
|
|
# 跳过解析错误的礼物
|
|
|
continue
|
|
|
|
|
|
def _parse_superchat(self, root: ET.Element):
|
|
|
"""解析SuperChat数据"""
|
|
|
for sc_element in root.findall('sc'):
|
|
|
try:
|
|
|
superchat = SuperChat(
|
|
|
user=sc_element.get('user', ''),
|
|
|
uid=int(sc_element.get('uid', 0)),
|
|
|
content=sc_element.text or '',
|
|
|
time=int(sc_element.get('time', 0)),
|
|
|
price=float(sc_element.get('price', 0.0)),
|
|
|
timestamp=float(sc_element.get('ts', None) or 0)
|
|
|
)
|
|
|
self.superchat_list.append(superchat)
|
|
|
except (ValueError, TypeError) as e:
|
|
|
# 跳过解析错误的SC
|
|
|
continue
|
|
|
|
|
|
def _parse_guard(self, root: ET.Element):
|
|
|
"""解析舰长数据"""
|
|
|
for guard_element in root.findall('guard'):
|
|
|
try:
|
|
|
guard = Guard(
|
|
|
user=guard_element.get('user', ''),
|
|
|
uid=int(guard_element.get('uid', 0)),
|
|
|
level=int(guard_element.get('level', 0)),
|
|
|
count=int(guard_element.get('count', 0)),
|
|
|
timestamp=float(guard_element.get('ts', None) or 0)
|
|
|
)
|
|
|
self.guard_list.append(guard)
|
|
|
except (ValueError, TypeError) as e:
|
|
|
# 跳过解析错误的舰长数据
|
|
|
continue
|
|
|
|
|
|
# 数据访问方法
|
|
|
def get_danmaku(self) -> List[Danmaku]:
|
|
|
"""获取所有弹幕数据"""
|
|
|
return self.danmaku_list.copy()
|
|
|
|
|
|
def get_gifts(self) -> List[Gift]:
|
|
|
"""获取所有礼物数据"""
|
|
|
return self.gift_list.copy()
|
|
|
|
|
|
def get_superchat(self) -> List[SuperChat]:
|
|
|
"""获取所有SuperChat数据"""
|
|
|
return self.superchat_list.copy()
|
|
|
|
|
|
def get_guard(self) -> List[Guard]:
|
|
|
"""获取所有舰长数据"""
|
|
|
return self.guard_list.copy()
|
|
|
|
|
|
def get_record_info(self) -> Optional[RecordInfo]:
|
|
|
"""获取录制信息"""
|
|
|
return self.record_info
|
|
|
|
|
|
def get_danmaku_count(self) -> int:
|
|
|
"""获取弹幕数量"""
|
|
|
return len(self.danmaku_list)
|
|
|
|
|
|
def get_gift_count(self) -> int:
|
|
|
"""获取礼物数量"""
|
|
|
return len(self.gift_list)
|
|
|
|
|
|
def get_superchat_count(self) -> int:
|
|
|
"""获取SuperChat数量"""
|
|
|
return len(self.superchat_list)
|
|
|
|
|
|
def get_guard_count(self) -> int:
|
|
|
"""获取舰长数量"""
|
|
|
return len(self.guard_list)
|
|
|
|
|
|
def get_danmaku_by_time_range(self, start_time: float, end_time: float) -> List[Danmaku]:
|
|
|
"""获取指定时间范围内的弹幕"""
|
|
|
return [d for d in self.danmaku_list if start_time <= d.time <= end_time]
|
|
|
|
|
|
def get_danmaku_by_user(self, user_name: str) -> List[Danmaku]:
|
|
|
"""获取指定用户的弹幕"""
|
|
|
return [d for d in self.danmaku_list if d.user == user_name]
|
|
|
|
|
|
def get_gifts_by_user(self, user_name: str) -> List[Gift]:
|
|
|
"""获取指定用户的礼物"""
|
|
|
return [g for g in self.gift_list if g.user == user_name]
|
|
|
|
|
|
def __enter__(self):
|
|
|
"""支持with语句"""
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
"""清理资源"""
|
|
|
if hasattr(self.file, 'close'):
|
|
|
self.file.close() |