Bilibili 弹幕搜索工具

实现方法

该工具使用 Python 实现,主要用于搜索和解析 Bilibili 视频的弹幕信息。

主要依赖

PyQt5==5.15.11
lxml==5.3.0
requests==2.30.0

核心功能

  1. 随机 User-Agent
import sys
import asyncio
import requests
import re
import datetime
from lxml import etree
from PyQt5.QtWidgets import QApplication, QWidget, QLineEdit, QPushButton, QVBoxLayout, QTableWidget, QTableWidgetItem
from PyQt5.QtCore import QRunnable, QThreadPool, pyqtSignal, QObject
import random
import logging  # 导入 logging 模块

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 保证每一次运行headers都不一样
user_agent_list = [
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) Gecko/20100101 Firefox/61.0",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36",
]

# CRC32 算法部分
CRCPOLYNOMIAL = 0xEDB88320
crctable = [0 for x in range(256)]

def create_table():
    for i in range(256):
        crcreg = i
        for _ in range(8):
            if (crcreg & 1) != 0:
                crcreg = CRCPOLYNOMIAL ^ (crcreg >> 1)
            else:
                crcreg = crcreg >> 1
        crctable[i] = crcreg

def crc32(string):
    crcstart = 0xFFFFFFFF
    for i in range(len(str(string))):
        index = (crcstart ^ ord(str(string)[i])) & 255
        crcstart = (crcstart >> 8) ^ crctable[index]
    return crcstart

def crc32_last_index(string):
    crcstart = 0xFFFFFFFF
    for i in range(len(str(string))):
        index = (crcstart ^ ord(str(string)[i])) & 255
        crcstart = (crcstart >> 8) ^ crctable[index]
    return index

def get_crc_index(t):
    for i in range(256):
        if crctable[i] >> 24 == t:
            return i
    return -1

def deep_check(i, index):
    string = ""
    tc = 0x00
    hashcode = crc32(i)
    tc = hashcode & 0xff ^ index[2]
    if not (tc <= 57 and tc >= 48):
        return [0]
    string += str(tc - 48)
    hashcode = crctable[index[2]] ^ (hashcode >> 8)
    tc = hashcode & 0xff ^ index[1]
    if not (tc <= 57 and tc >= 48):
        return [0]
    string += str(tc - 48)
    hashcode = crctable[index[1]] ^ (hashcode >> 8)
    tc = hashcode & 0xff ^ index[0]
    if not (tc <= 57 and tc >= 48):
        return [0]
    string += str(tc - 48)
    hashcode = crctable[index[0]] ^ (hashcode >> 8)
    return [1, string]

def parse_crc(user_id):
    index = [0 for x in range(4)]
    ht = int(f"0x{user_id}", 16) ^ 0xffffffff

    for i in range(3, -1, -1):
        index[3 - i] = get_crc_index(ht >> (i * 8))
        snum = crctable[index[3 - i]]
        ht ^= snum >> ((3 - i) * 8)

    for i in range(100000000):
        lastindex = crc32_last_index(i)
        if lastindex == index[3]:
            deepCheckData = deep_check(i, index)
            if deepCheckData[0]:
                return f"{i}{deepCheckData[1]}"
    return -1

# PyQt 界面部分
class WorkerSignals(QObject):
    result = pyqtSignal(object)
    error = pyqtSignal(str)

class Worker(QRunnable):
    def __init__(self, fn, *args, **kwargs):
        super(Worker, self).__init__()
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    def run(self):
        try:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            result = loop.run_until_complete(self.fn(*self.args, **self.kwargs))
            loop.close()
            self.signals.result.emit(result)
        except Exception as e:
            self.signals.error.emit(str(e))

class BilibiliSearchApp(QWidget):

    def __init__(self):
        super().__init__()
        self.threadpool = QThreadPool()
        create_table()  # 初始化 CRC 查找表
        self.initUI()

    def initUI(self):
        self.setWindowTitle('Bilibili Danmaku Search')
        self.setGeometry(300, 300, 900, 400)

        layout = QVBoxLayout()

        self.search_input = QLineEdit(self)
        self.search_input.setPlaceholderText("输入BV号")
        layout.addWidget(self.search_input)

        self.danmaku_input = QLineEdit(self)
        self.danmaku_input.setPlaceholderText("输入弹幕关键词(可选)")
        layout.addWidget(self.danmaku_input)

        self.search_button = QPushButton('搜索', self)
        self.search_button.clicked.connect(self.search)
        layout.addWidget(self.search_button)

        self.result_table = QTableWidget(self)
        self.result_table.setColumnCount(6)  # 增加一列用于显示解析后的ID
        self.result_table.setHorizontalHeaderLabels(["出现时间", "原始用户ID", "弹幕内容", "发送时间", "用户ID", "解析操作"])
        layout.addWidget(self.result_table)

        self.setLayout(layout)

    def search(self):
        bvid = self.search_input.text()
        keyword = self.danmaku_input.text().strip()
        if bvid:
            logging.info(f"开始搜索 BV号: {bvid},关键词: {keyword}")
            self.search_button.setEnabled(False)
            self.result_table.setRowCount(0)
            worker = Worker(self.get_danmaku_info, bvid, keyword)
            worker.signals.result.connect(self.display_result)
            worker.signals.error.connect(self.display_error)
            self.threadpool.start(worker)

    async def get_danmaku_info(self, bvid, keyword=None):
        oid_url = f"https://api.bilibili.com/x/player/pagelist?bvid={bvid}"
        headers = {
            "User-Agent": random.choice(user_agent_list),
            "referer": f"https://www.bilibili.com/video/{bvid}"
        }

        try:
            logging.info(f"请求视频信息,URL: {oid_url}")
            response = requests.get(oid_url, headers=headers, timeout=5)
            response.raise_for_status()
            logging.info("成功获取视频信息")
        except Exception as e:
            logging.error(f"请求失败: {e}")
            self.signals.error.emit("请求失败,请检查网络连接或输入是否正确。")
            return []

        oid = re.findall(r'"cid":(.*?),', response.text, re.S)[0]
        danmu_url = f"https://api.bilibili.com/x/v1/dm/list.so?oid={oid}"

        try:
            logging.info(f"请求弹幕信息,URL: {danmu_url}")
            response = requests.get(danmu_url, headers=headers, timeout=5)
            response.raise_for_status()
            logging.info("成功获取弹幕信息")
        except Exception as e:
            logging.error(f"获取弹幕失败: {e}")
            self.signals.error.emit("获取弹幕失败,请稍后再试。")
            return []

        html = etree.HTML(response.content)
        d_elements = html.xpath("//d")

        dms = []
        for d in d_elements:
            p_attr = d.attrib.get('p')
            text = d.text
            p_attr_parts = p_attr.split(',')
            dm_time = float(p_attr_parts[0])
            send_timestamp = int(p_attr_parts[4])
            original_user_id = p_attr_parts[6]  # 取原始用户ID

            dms.append({
                'dm_time': dm_time,
                'send_time': send_timestamp,
                'original_user_id': original_user_id,  # 存储原始ID
                'text': text
            })

        # 根据关键词过滤
        logging.info(f"过滤弹幕,关键词: {keyword}")
        if keyword:
            dms = [dm for dm in dms if keyword in dm['text']]
            logging.info(f"过滤后剩余弹幕数量: {len(dms)}")

        # 按照 dm_time 排序
        logging.info("按照时间排序弹幕")
        dms.sort(key=lambda dm: dm['dm_time'])

        return dms

    def display_result(self, danmaku_info):
        logging.info(f"显示结果,弹幕数量: {len(danmaku_info)}")
        self.result_table.setRowCount(len(danmaku_info))

        for row, dm in enumerate(danmaku_info):
            self.result_table.setItem(row, 0, QTableWidgetItem(f"{dm['dm_time']}s"))
            self.result_table.setItem(row, 1, QTableWidgetItem(dm['original_user_id']))  # 显示原始用户ID
            self.result_table.setItem(row, 2, QTableWidgetItem(dm['text']))
            self.result_table.setItem(row, 3, QTableWidgetItem(self.convert_timestamp_to_hms(dm['send_time'])))

            # 添加解析按钮
            parse_button = QPushButton("解析CRC")
            parse_button.clicked.connect(lambda checked, uid=dm['original_user_id'], row=row: self.parse_crc(uid, row))
            self.result_table.setCellWidget(row, 5, parse_button)

        self.search_button.setEnabled(True)

    def parse_crc(self, user_id, row):
        # 调用新的 CRC 解析函数
        crc_result = parse_crc(user_id)
        self.result_table.setItem(row, 4, QTableWidgetItem(str(crc_result)))  # 显示解析结果

    def convert_timestamp_to_hms(self, timestamp):
        return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')

    def display_error(self, error_msg):
        logging.error(f"发生错误: {error_msg}")
        self.result_table.setRowCount(0)
        self.result_table.setColumnCount(1)
        self.result_table.setHorizontalHeaderLabels(["发生错误"])
        self.result_table.setItem(0, 0, QTableWidgetItem(error_msg))
        self.search_button.setEnabled(True)

def main():
    app = QApplication(sys.argv)
    ex = BilibiliSearchApp()
    ex.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    create_table()  # 初始化 CRC 查找表
    main()

主要组件

  1. CRC32 算法实现

    • 创建查找表
    • CRC32 计算
    • 深度检查
    • CRC 解析
  2. PyQt 界面实现

    • 工作线程管理
    • 异步操作处理
    • 界面布局设计

功能特点

  • 支持 BV 号搜索
  • 支持弹幕关键词过滤
  • 异步处理请求
  • 线程池管理
  • 日志记录功能
  • CRC32 解析功能

界面组件

  • 搜索输入框(BV号)
  • 弹幕关键词输入框
  • 搜索按钮
  • 结果显示表格
    • 出现时间
    • 原始用户ID
    • 弹幕内容
    • 发送时间
    • 用户ID
    • 解析操作

用户定位

用户可以通过以下URL访问用户空间:

https://space.bilibili.com/用户ID

重要说明

关于 UID 返回结果的准确性:

由于弹幕文件的 mid 是通过 CRC32 校验得到的结果转为16进制数,一般情况下无法逆向。虽然存在反推算法,但需要注意以下几点:

  1. 算法作者声明 "Sometimes the results are inaccurate"
  2. B站最新的16位 mid 以及超过10位以上的 mid 被加密后无法正常反推
  3. 8、9位 UID 的返回结果基本正确
  4. 不同 mid 可能会计算成同一个哈希值,数据越大撞库可能性越大

免责声明:此程序只能保证大部分用户发送的弹幕中 mid_hash 逆向结果是正确的。若需查证成分需进一步核实,因程序造成的误伤后果自负!

运行方法

  1. 安装依赖:
pip install -r requirements.txt
  1. 运行程序:
if __name__ == '__main__':
    create_table()  # 初始化 CRC 查找表
    main()