rendering

  • Above is a screenshot of a popular live broadcast on Zhanqi platform on the night of January 22

    As in a lineLive room title -[nickname of projectile shooter]- projectile content




Rendering 2


Open source packages

  • I first found live video barrage crawler from the Ruby version of Douyu TV barrage crawler, who also has detailed principle analysis to record a douyu TV barrage crawler experience (Ruby version), he also made a Python version, but it is only applicable to Mac and Linux system.
  • Later, I found littlecodersh’s open source Python bullet-screen package on Github. It has no operating system dependence, and has expanded applicable live streaming platform. Douyu, Panda, Zhanqi, Quan, Bilibili bullet-screen authentication connection and message parsing code are all available, applicable to multiple platforms, with proper decoupling and clear structure.

The principle of

  • In principle, the host information, bullet-screen server information and bullet-screen authentication server information are firstly obtained from the page of live broadcast room or corresponding API interface, and then connected through socket to maintain heartbeat packet and continuously obtain bullet-screen data. But it’s all done in a multi-threaded way. Littlecodersh code, for example, the structure of his live is probably provided by the user page url launching a barrage message processing threads (the process is already finished parse structured good barrage, is actually an interface) and a corresponding platform of the client thread, the client thread initializes a socket and maintain two child thread, The two sub-threads share the same socket to send heartbeat packets and receive the original data of the barrage message respectively. In the thread receiving the message, the parsing and structuring of the barrage message are completed at the same time, and they are put into the queue and provided to the initial barrage message processing thread for further processing of the barrage message. But the bullet screen of multiple rooms to open multiple processes, you can also modify the source code accordingly, take the way of open client thread, in the same process for processing.

Application Scenarios And asynchronous modification attempts

  • Considering the application scenario, the goal is to collect high-quality clips of popular programs on a live broadcasting platform. Firstly, we should consider how to judge the high-quality clips and the sudden increase in the number of live bullets (of course, it may also be that the anchor is drawing prizes). To a large extent, it is the signal of special situations occurring in live broadcast, so the preliminary screening of high-quality clips can be solved by real-time monitoring of bullet screen information to find the changing nodes of bullet screen. Meanwhile, the current audience hot words can be processed according to the bullet screen information, which is convenient for further processing. So you need to get hundreds of live shows on one platform at a time, or more if you have multiple platforms, and maybe asynchronous coroutines are better for python. Here I’ve rewritten the Littlecodersh thread code to asynchronous (thanks Littlecodersh), using an Eventloop controller to manage all asynchronous socket events, with each socket receiving a live barrage, and the code structure looks and feels a little more understandable. When the test receives 200 units simultaneously, the memory usage is about 30M (the Python interpreter occupies about 10M points), and the traffic is about 400Kb/s. Here is just rewriting the code of battle flag platform to make a try, many places are also rough processing, can improve more places, welcome to give advice and exchange.
  • First of all, get the current hot live broadcast from the battle Flag platform live list page, and directly use the following code to climb the hot list to TXT file to save, here only climbed the first page.
    import requests
    from bs4 import BeautifulSoup
    #
    r = requests.get('https://www.zhanqi.tv/lives')
    soup = BeautifulSoup(r.content, "lxml") You can do without LXML
    urlist = [i.get('href') for i in soup.select("#hotList li a")]
    with open('urlist3.txt'.'a') as f:
      for i in urlist:
          f.write(
              ' '.join(['https://www.zhanqi.tv', i, '\n']))Copy the code
  • Then connect all the rooms in the TXT file using the following code,python zhanqidanmu.pyCan be executed.

    After receiving the original data of the barrage message, the socket needs to perform time-consuming data parsing, namelymsgHandleBlockMethod, which takes the approach of maintaining a Thread pool (or Process pool, simply by changing Thread to Process) and delegating the time-consuming work to another Thread. For the data completed by preliminary processing, it is simply printed to the console, which can be modified directly into the database or set aside interfaces.
# zhanqidanmu.py
import abc
import asyncio
import socket
import concurrent.futures

import sys
import json
import time
import re
import base64
from struct import pack
import requests

USER_AGENT = 'the Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 \ (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36'


async def testMemory(a):
    Test memory usage
    import os
    import psutil
    while True:
        process = psutil.Process(os.getpid())
        print(os.getpid(), 'footprint',
              str(process.memory_info().rss / 1024 / 1024))
        await asyncio.sleep(10)


class DanMuClientManager(a):

    def __init__(self, loop=None, executor=None):
        self.loop = loop or asyncio.get_event_loop()
        A private event loop controller
        self.executor = executor or concurrent.futures.ThreadPoolExecutor(
            max_workers=2.)A private thread pool for processing CPU-intensive tasks
        self._urltextProcess()
        Get a list of live rooms to connect to

    def _urltextProcess(self):
        with open('urlist3.txt'.'r') as f:
            self.url = f.readlines()

    def start(self):

        clientList = [ZhanQiDanMuClient(url.strip(), self.loop, self.executor)
                      for url in self.url ifurl ! ='\n']
        The url list instantiation client generates the list
        initTasks = []
        clients = []
        for c in clientList:
            try:
                danmuSocketInfo, roomInfo = c.prepare_env()
                Complete the preparatory work and generate the information of the barrage server and the room
            except:
                print("An anchor is not online.", c.url)
            else:
                clients.append(c)
                initTasks.append(c.init_socket(danmuSocketInfo, roomInfo))
                Queue all socket initial connection coroutines
        self.loop.run_until_complete(asyncio.gather(*initTasks))
        Wait for the connection to complete
        print('Connection to barrage server completed *', len(initTasks))
        danmuTasks = [testMemory()]
        for c in clients:
            danmuTasks.extend([
                asyncio.ensure_future(c.heartCoro()),
                asyncio.ensure_future(c.danmuCoro()),
            ])
        Generate a task list of all heartbeat coroutines and barrage message receiving coroutines
        try:
            self.loop.run_until_complete(asyncio.gather(*danmuTasks))
            # Keep receiving barrage messages
        except KeyboardInterrupt:
            print('off')
        finally:
            # print(">> Cancelling tasks now")
            # for task in asyncio.Task.all_tasks():
            # task.cancel()
            # self.loop.run_until_complete(asyncio.sleep(1))
            # print(">> Done cancelling tasks")
            self.loop.close()


class AbstractDanMuClient(metaclass=abc.ABCMeta):
    Main process: first obtain the live broadcast state, then obtain the address and room information of the barrage server, then open socket connection and authentication, and finally continue to send heartbeat packets and receive barrage messages.

    def __init__(self, url, loop, executor):
        self.url = url
        self.loop = loop
        self.executor = executor
        self.sock = None

    @abc.abstractmethod
    def _get_live_status(self):
        Obtain the live broadcast status of the host from the live broadcast webpage.
        return False

    @abc.abstractmethod
    def _prepare_env(self):
        Get the IP and port number of the barrage server and room information for authentication.
        return ('0.0.0.0'.80), {}
        # danmuSocketInfo, roomInfo

    def prepare_env(self):
        Call self._get_live_STATUS and self._prepare_env to complete the preparation.
        if not self._get_live_status():
            raise Exception(U "Live broadcast has not started")
        return self._prepare_env()

    @abc.abstractmethod
    async def _init_socket(self, roomInfo):
        "The specific socket connection method to the room, overridden by subclasses, should be await self.loop.sock_sendall to send data."
        pass

    async def init_socket(self, danmuSocketInfo, roomInfo):
        Initializes the socket and calls the self.init_socket method.
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setblocking(False)
        try:
            await self.loop.sock_connect(self.sock, danmuSocketInfo)
            # ConnectionRefusedError
        except Exception as e:
            print(e)
        else:
            await self._init_socket(roomInfo)

    @abc.abstractmethod
    async def heartCoro(self):
        Send heartbeat packets every x seconds to maintain webSocket connections.
        pass

    async def danmuCoro(self):
        Barrage processing coroutine, receives barrage data asynchronously and processes the data in another thread/process using selm.msghandleBlock because the data flow is one-way so even the process doesn't bother.
        while True:
            content = await self.loop.sock_recv(self.sock, 1024)
            # sock_recV (sock[, 1024]) receive bytes cannot be omitted
            self.loop.run_in_executor(self.executor,
                                      self.msgHandleBlock, content)

    @abc.abstractmethod
    def msgHandleBlock(self, content):
        Blocking time consuming barrage data processing
        pass


def pp(msg):
    print(msg.encode(sys.stdin.encoding, 'ignore').
          decode(sys.stdin.encoding))


class ZhanQiDanMuClient(AbstractDanMuClient):
# Other platforms can be similar modification
    def _get_live_status(self):
        url = 'https://www.zhanqi.tv/' + \
              self.url.split('/') [- 1] or self.url.split('/') [2 -]
        r = requests.get(url, headers={'User-Agent': USER_AGENT})
        if r.url == 'https://www.zhanqi.tv/':
            return False
        rawJson = re.findall('oRoom = (.*); [\s\S]*? window.', r.text)
        if not rawJson:
            rawJson = re.findall('aVideos = (.*); [\s\S]*? oPageConfig.', r.text)
        self.roomInfo = json.loads(rawJson[0])
        # if isinstance(self.roomInfo, list):
        # self.roomInfo = self.roomInfo[0]
        # print(self.roomInfo['title'])
        return self.roomInfo['status'] = ='4'

    def _prepare_env(self):
        serverAddress = json.loads(base64.b64decode(
            self.roomInfo['flashvars'] ['Servers']).decode('ascii'))'list'] [0]
        serverAddress = (serverAddress['ip'], serverAddress['port'])
        url = '%s/api/public/room.viewer' % 'https://www.zhanqi.tv'
        params = {
            'uid': self.roomInfo['uid'].'_t': int(time.time() / 60), }
        roomInfo = requests.get(url, params).json()
        roomInfo['id'] = int(self.roomInfo['id'])
        # print(serverAddress, roomInfo)
        return serverAddress, roomInfo

    async def _init_socket(self, roomInfo):
        data = {
            'nickname': ' '.'roomid': int(roomInfo['id']),
            'gid': roomInfo['data'] ['gid'].'sid': roomInfo['data'] ['sid'].'ssid': roomInfo['data'] ['sid'].'timestamp': roomInfo['data'] ['timestamp'].'cmdid': 'loginreq'.'develop_date': '2015-06-07'.'fhost': 'zhanqi.tool'.'fx': 0.'t': 0.'thirdacount': ' '.'uid': 0.'ver': 2.'vod': 0,
        }
        data = json.dumps(data, separators=(', '.':'))
        await self.loop.sock_sendall(self.sock,
                                     b'\xbb\xcc' + b'\x00' * 4 +
                                     pack('i', len(data)) + b'\x10\x27' +
                                     data.encode('ascii'))

    async def heartCoro(self):
        while True:
            await self.loop.sock_sendall(self.sock,
                                         b'\xbb\xcc' +
                                         b'\x00' * 8 + b'\x59\x27')
            await asyncio.sleep(3)

    def msgHandleBlock(self, content):
        for msg in re.findall(b'\x10\x27({[^\x00]*})\x0a', content):
            try:
                msg = json.loads(msg.decode('utf8'.'ignore'))
                msg['NickName'] = (msg.get('fromname'.' ') or
                                   msg.get('data', {}).get('nickname'.' '))
                msg['Content'] = msg.get('content'.' ')
                if 'chatm' in msg.get('cmdid'.' '):
                    msg['MsgType'] = 'danmu'
                    pp("30} {0: < - / {1} {2: - > 10}".format(
                        self.roomInfo['title'],
                        msg['NickName'], msg['Content']))
                    # formatting output - refer to http://www.crifan.com/python_string_format_fill_with_chars_and_set_alignment/
                elif 'Gift' in msg.get('cmdid'.' '):
                    msg['MsgType'] = 'gift'
                else:
                    msg['MsgType'] = 'other'
            except Exception as e:
                print('Message parsing error')
            else:
                # self.msgPipe.append(msg)
                pass


if __name__ == '__main__':
    cm = DanMuClientManager()
    cm.start()Copy the code

other

  • Combining Coroutines with Threads and Processes

    Python multi-processmultiprocessing.ProcessAnd asynchronousasyncio.get_event_loop()It can be problematic to use them directly together. Useloop.run_in_executor()To solve the problem.
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=3,) 
    loop.run_in_executor(executor, task, args)Copy the code