You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
10 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
mikufans录播姬弹幕XML文件读取器
支持解析mikufans录播姬生成的弹幕XML文件包括
- 弹幕数据
- 礼物数据
- SuperChat数据
- 舰长购买数据
- 录制信息
使用示例:
# 从文件读取
with DanmakuReader('danmaku.xml') as reader:
danmaku_list = reader.get_danmaku()
gift_list = reader.get_gifts()
# 从字符串读取
reader = DanmakuReader(xml_content.encode('utf-8'))
print(f"弹幕数量: {reader.get_danmaku_count()}")
"""
import io
import xml.etree.ElementTree as ET
from typing import TextIO, List, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Danmaku:
"""弹幕数据类"""
time: float # 弹幕出现时间(秒)
type: int # 弹幕类型1-滚动5-顶部4-底部)
fontsize: int # 字号
color: int # 颜色
timestamp: int # 时间戳
pool: int # 弹幕池
user_id: int # 用户ID
user_hash: int # 用户哈希
content: str # 弹幕内容
user: str # 用户名
@dataclass
class Gift:
"""礼物数据类"""
timestamp: float # 时间戳
user: str # 用户名
uid: int # 用户ID
giftname: str # 礼物名称
giftcount: int # 礼物数量
@dataclass
class SuperChat:
"""SuperChat数据类"""
user: str # 用户名
uid: int # 用户ID
content: str # SC内容
time: int # 显示时长
price: float # 价格
timestamp: float # 时间戳
@dataclass
class Guard:
"""上船数据类"""
user: str # 用户名
uid: int # 用户ID
level: int # 舰长等级
count: int # 购买数量
timestamp: float # 时间戳
@dataclass
class RecordInfo:
"""录制信息数据类"""
roomid: int # 房间号
shortid: int # 短房间号
name: str # 主播名
title: str # 直播标题
areanameparent: str # 父分区
areanamechild: str # 子分区
start_time: datetime # 开始时间
class DanmakuReader:
def __init__(self, file: str | TextIO | bytes):
if isinstance(file, str):
self.file = open(file, 'r', encoding='utf-8')
elif isinstance(file, bytes):
self.file = io.StringIO(file.decode('utf-8'))
else:
self.file = file
# 存储解析后的数据
self.danmaku_list: List[Danmaku] = []
self.gift_list: List[Gift] = []
self.superchat_list: List[SuperChat] = []
self.guard_list: List[Guard] = []
self.record_info: Optional[RecordInfo] = None
# 自动解析XML
self._parse_xml()
def _parse_xml(self):
"""解析XML文件"""
try:
content = self.file.read()
# 重置文件指针
if hasattr(self.file, 'seek'):
self.file.seek(0)
# 解析XML
root = ET.fromstring(content)
# 解析录制信息
self._parse_record_info(root)
# 解析各种数据
self._parse_danmaku(root)
self._parse_gifts(root)
self._parse_superchat(root)
self._parse_guard(root)
except ET.ParseError as e:
raise ValueError(f"XML解析错误: {e}")
except Exception as e:
raise ValueError(f"文件读取错误: {e}")
def _parse_iso_time(self, time_str: str) -> datetime:
# 只保留小数点后6位
if '.' in time_str:
date_part, rest = time_str.split('.', 1)
microsecond = rest[:6]
tz = rest[7:] # 跳过6位微秒和1位时区分隔符
s_fixed = f"{date_part}.{microsecond}{tz}"
else:
s_fixed = time_str
return datetime.fromisoformat(s_fixed)
def _parse_record_info(self, root: ET.Element):
"""解析录制信息"""
record_element = root.find('BililiveRecorderRecordInfo')
if record_element is not None:
start_time_str = record_element.get('start_time', '')
start_time = None
if start_time_str:
try:
# 尝试解析为标准时间格式
start_time = self._parse_iso_time(start_time_str)
except ValueError:
pass
self.record_info = RecordInfo(
roomid=int(record_element.get('roomid', 0)),
shortid=int(record_element.get('shortid', 0)),
name=record_element.get('name', ''),
title=record_element.get('title', ''),
areanameparent=record_element.get('areanameparent', ''),
areanamechild=record_element.get('areanamechild', ''),
start_time=start_time
)
def _parse_danmaku(self, root: ET.Element):
"""解析弹幕数据"""
for d_element in root.findall('d'):
try:
# 解析p属性格式时间,类型,字号,颜色,时间戳,池,用户ID,用户哈希
p_attr = d_element.get('p', '')
if not p_attr:
continue
p_parts = p_attr.split(',')
if len(p_parts) < 8:
continue
danmaku = Danmaku(
time=float(p_parts[0]),
type=int(p_parts[1]),
fontsize=int(p_parts[2]),
color=int(p_parts[3]),
timestamp=int(p_parts[4]),
pool=int(p_parts[5]),
user_id=int(p_parts[6]),
user_hash=int(p_parts[7]),
content=d_element.text or '',
user=d_element.get('user', '')
)
self.danmaku_list.append(danmaku)
except (ValueError, IndexError) as e:
# 跳过解析错误的弹幕
continue
def _parse_gifts(self, root: ET.Element):
"""解析礼物数据"""
for gift_element in root.findall('gift'):
try:
gift = Gift(
timestamp=float(gift_element.get('ts', None) or 0),
user=gift_element.get('user', ''),
uid=int(gift_element.get('uid', 0)),
giftname=gift_element.get('giftname', ''),
giftcount=int(gift_element.get('giftcount', 0))
)
self.gift_list.append(gift)
except (ValueError, TypeError) as e:
# 跳过解析错误的礼物
continue
def _parse_superchat(self, root: ET.Element):
"""解析SuperChat数据"""
for sc_element in root.findall('sc'):
try:
superchat = SuperChat(
user=sc_element.get('user', ''),
uid=int(sc_element.get('uid', 0)),
content=sc_element.text or '',
time=int(sc_element.get('time', 0)),
price=float(sc_element.get('price', 0.0)),
timestamp=float(sc_element.get('ts', None) or 0)
)
self.superchat_list.append(superchat)
except (ValueError, TypeError) as e:
# 跳过解析错误的SC
continue
def _parse_guard(self, root: ET.Element):
"""解析舰长数据"""
for guard_element in root.findall('guard'):
try:
guard = Guard(
user=guard_element.get('user', ''),
uid=int(guard_element.get('uid', 0)),
level=int(guard_element.get('level', 0)),
count=int(guard_element.get('count', 0)),
timestamp=float(guard_element.get('ts', None) or 0)
)
self.guard_list.append(guard)
except (ValueError, TypeError) as e:
# 跳过解析错误的舰长数据
continue
# 数据访问方法
def get_danmaku(self) -> List[Danmaku]:
"""获取所有弹幕数据"""
return self.danmaku_list.copy()
def get_gifts(self) -> List[Gift]:
"""获取所有礼物数据"""
return self.gift_list.copy()
def get_superchat(self) -> List[SuperChat]:
"""获取所有SuperChat数据"""
return self.superchat_list.copy()
def get_guard(self) -> List[Guard]:
"""获取所有舰长数据"""
return self.guard_list.copy()
def get_record_info(self) -> Optional[RecordInfo]:
"""获取录制信息"""
return self.record_info
def get_danmaku_count(self) -> int:
"""获取弹幕数量"""
return len(self.danmaku_list)
def get_gift_count(self) -> int:
"""获取礼物数量"""
return len(self.gift_list)
def get_superchat_count(self) -> int:
"""获取SuperChat数量"""
return len(self.superchat_list)
def get_guard_count(self) -> int:
"""获取舰长数量"""
return len(self.guard_list)
def get_danmaku_by_time_range(self, start_time: float, end_time: float) -> List[Danmaku]:
"""获取指定时间范围内的弹幕"""
return [d for d in self.danmaku_list if start_time <= d.time <= end_time]
def get_danmaku_by_user(self, user_name: str) -> List[Danmaku]:
"""获取指定用户的弹幕"""
return [d for d in self.danmaku_list if d.user == user_name]
def get_gifts_by_user(self, user_name: str) -> List[Gift]:
"""获取指定用户的礼物"""
return [g for g in self.gift_list if g.user == user_name]
def __enter__(self):
"""支持with语句"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""清理资源"""
if hasattr(self.file, 'close'):
self.file.close()