This is the first day of my participation in the Gwen Challenge in November. Check out the details: the last Gwen Challenge in 2021

0 x1, introduction

Similarly, keep up with the section “Van came Python | a site course simple crawl, crawl is the cause of paid services will expire (root cause or poor man T T)

The technical solution is also: dot dot + capture package, this section by the way to try to use a long time automatic artifact → Pyppeteer

Solemnly declare:

This paper is only used to record the study of crawler technology, and will not provide direct crawler scripts. The crawler data has been deleted and not transmitted. Do not use it for illegal purposes. If any other illegal purposes cause losses, it is irrelevant to this article.


0x2, Pyppeteer

1. The origin of Puppeteer?

Puppeteer is an official Google NodeJS library that controls Headless Chrome through the DevTools protocol. Puppeteer provides an API that directly controls most user operations in Chrome, performs UI tests, and crawls page visits to collect data. Pyppeter can be understood as the Python version of Puppeteer.

2. Compared with Selenium?

Compared to the Selenium library, Pyppeteer does not require tedious environment configuration. When running for the first time, Pyppeteer will check whether it follows Chromium, and the uninstalled program will automatically install and configure it for us. And Pyppeteer is based on Python’s new async implementation (Python 3.5 and above), so some of its implementations also support asynchronous operations, which is much more efficient.

3. API documentation

  • Official warehouse: github.com/pyppeteer/p…
  • The official document: pyppeteer. Making. IO/pyppeteer/r…
  • Official Documentation (English – Puppeteer) : github.com/zhaoqize/pu…

4. Puppeteer architecture diagram

Brief introduction (just know, don’t remember) :

  • Puppeteer: communicates with the browser through the DevTools protocol.
  • Browser: can hold the Browser context;
  • BrowserContext: defines a browser session and can have multiple pages;
  • Page: at least one frame, main frame;
  • Frame: At least one execution context, the default execution context (the Frame’s JavaScript) is executed;
  • Worker: with a single execution context, it is convenient to interact with WebWorkers;

5, Pyppeteer installation

  • Step 1: Install pyppeteer for PIP
pip install pyppeteer
Copy the code
  • Step 2: Install Chromium

Write a simple application using the Pyppeteer library, run it, it will automatically download, such as the script to generate a screenshot of the front page of gold digging:

import asyncio
from pyppeteer import launch


async def screen_shot() :
    browser = await launch()
    page = await browser.newPage()
    await page.goto('https://juejin.cn/')
    await page.screenshot({'path': 'juejin.jpg'})
    await browser.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(screen_shot())
Copy the code

You can download the executablePath package from Taobao, decompress it, and specify the executablePath at launch() as follows:

From pyppeteer import chromium_downloader Win32 win64, Linux, MAC print (chromium_downloader. DownloadURLs. Get (" win64 ")) # run sample output: # https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip # 2) Storage.googleapis.com is replaced with taobao source npm.taobao.org/mirrors, for example: https://npm.taobao.org/mirrors/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip # can also into the site to choose: https://npm.taobao.org/mirrors/chromium-browser-snapshots # 3, launch specified when userDataDir await launch ({' headless ': headless,'args': launch_args, 'executablePath': './userData')Copy the code

Attached: code flow analysis

async Declare an asynchronous operation
await Declare a time-consuming operation

Create an asynchronous pool and execute screen_shot()
asyncio.get_event_loop().run_until_complete(screen_shot()) 

Create a browser object that can be passed as a dictionary parameter
browser = await launch() 

Create a page object on which page operations are performed
page = await browser.newPage() 

await page.goto('https://juejin.cn/') # page jump
await page.screenshot({'path': 'juejin.jpg'}) # Save screenshot
await browser.close() Close the browser object
Copy the code

That’s all you need to know about the API, check out the official documentation for more, or use search engines, and then analyze the crawl process.


0x3 data crawl

â‘  Access the login page

Login page: Not logged in → the following login panel will be displayed; logged in → the main page will be automatically redirected.

Crawl process

  • Request the login page to determine whether there is a login TWO-DIMENSIONAL code node. If there is, sleep for 10s to wait for scanning code login.
  • There is no QR code, it means to enter the home page;
  • The above two steps enter the grouping to obtain;

â‘¡ Group acquisition

On the left panel you can see: Created/managed planets and added planets:

F12 look at the node:

Crawl process

  • Through the selector to locate the two junction points, to obtain all the planet name and link, output for the user number to choose the planet to climb;
  • The selector d
  • Through the selector selector to locate the two nodes, get all the planet names and links, output for the user to select the planet to climb;
  • Div. Created -group > div:nth-child(2) > a

â‘¢ Content crawl

At first I just wanted to climb down the elite classification, but later I found that some planets may be like this without data:

With Ajax, the easiest way to get data without trying to crack the rules of the interface is to simulate scrolling + parsing nodes.

But in this scenario, parsing nodes is too inefficient, there are too many tag + text + link combinations, too many parsing rules to write, and intercepting specific requests is more mindless and efficient.

Then take a look at the features of the Ajax request, such as the composition of the URL, which will be used for filtering the request later. Open the Network TAB, clear it, select XHR, scroll to the bottom of the page, and see the loaded request:

Open it, make sure it’s the required data, intercept the request, save the data locally, and finally batch it together.

â‘£ Two ways to determine when rolling stops

Keep scrolling down, need to determine when the data is finished, and stop scrolling. Here are two ideas:

  • Asyncio.sleep () + pyppeteer to find the bottom node

This is an infinite loop that keeps checking to see if the bottom node is visible, so the site slides to the bottom:

<div _ngcontent-isv-c98="" class="no-more">Copy the code
  • Method 2: JS timer + sliding distance and height judgment

Is to open open a timer, record the scrolling distance and the current page height, compare the former >= the latter, it may slide to the bottom.

Yes, it is possible, because there is no load on the list, so you can add a number of retries, only when the number of retries reaches the threshold.

I set the retry threshold to a large value, which is also considered as indirect hibernation.

⑤ Initialize the browser

Now that the process is clear, I can start writing code to implement the crawl, initializing the browser:

import asyncio
import os
import time
from pyppeteer import launch

import cp_utils

Start configuration parameters
launch_args = [
    "--no-sandbox".# Non-sandbox mode
    "--disable-infobars".# Hide the message bar
    # set UA
    "- the user-agent = Mozilla / 5.0 (Windows NT 10.0; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko)"
    "Chrome / 83.0.4103.97 Safari / 537.36"."--log-level=3".# log level
]

# start browser
async def init_browser(headless=False) :
    return await launch({'headless': headless,
                         'args': launch_args,
                         'userDataDir': './userData'.'dumpio': True.'ignoreHTTPSErrors ': True})
Copy the code

â‘¥ Creating a Page

Then create a new browser page with browser.newPage(), adding anti-Webdrivder detection in addition to the general Settings

# create a new page
async def init_page(browser) :
    page = await browser.newPage()
    await page.setViewport({'width': 1960.'height': 1080}) Set the page width and height
    await page.setJavaScriptEnabled(True)
    await prevent_web_driver_check(page)
    return page


# Prevent WebDriver detection
async def prevent_web_driver_check(page) :
    if page is not None:
        # Hide webDriver features
        await page.evaluateOnNewDocument("""() => { Object.defineProperty(navigator, 'webdriver', { get: () => undefined })} """)
        Some sites will call JS to modify the result in order to detect the browser
        await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; } ' ' ')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'lang uages', { get: () => ['en-US', 'en'] }); } ' ' ')
        await page.evaluate(
            "' () = > {Object. DefineProperty (the navigator," plugins ", {get: () = > [1, 2, 3, 4, 5, 6],}); } ' ' ')
Copy the code

⑦ List of landing and pulling planets

Use page.waitForSelector() to set timeout, check whether there is a login QR code node, execute the following logic:

Async def login(page, timeout=60): await page.goto(login_url, options={'timeout': int(timeout * 1000)}) try: Await page.waitForSelector('img.qrcode', {'visible': 'visible', 'timeout': 3000}) ) await asyncio.sleep(10) await fetch_group(page) except errors.TimeoutError: print(" detect login state, pull list..." Async def fetch_group(page) async def fetch_group(page): global choose_group_id, Group_list = [] created_groups = await page.jj ('div.created-group > div:nth-child(2) > a') joined_groups = await page.JJ('div.joined-group > div:nth-child(2) > a') for item in created_groups + joined_groups: group_name = await page.evaluate('item => item.textContent', item) group_url = await (await item.getProperty('href')).jsonValue() group_list.append([group_name.strip(), Group_url]) print(" for index, group in enumerate(group_list): Print (index, ', ', group) choose_group_index = input(" Choose_group = group_list[int(choose_group_index)] choose_group_id = choose_group[1].split('/')[-1] choose_group_name = choose_group[0] await fetch_data(page, choose_group[1])Copy the code

The running results are as follows:

⑧ Interception of requests, responses and data preservation

# request blocking
async def intercept_request(req) :
    Disallow images, multimedia resources, and Websocket requests
    if req.resourceType in ['image'.'media'.'eventsource'.'websocket'] :await req.abort()
    else:
        await req.continue_()


# intercept response
async def intercept_response(resp) :
    resp_type = resp.request.resourceType
    if resp_type in ['xhr'] and 'https://xxx/v2/groups/{}/topics? scope=all&count=20'.format(
            choose_group_id) in resp.url:
        content = await resp.text()
        if len(content) > 0:
            temp_dir = os.path.join(content_save_dir, choose_group_name)
            cp_utils.is_dir_existed(temp_dir)
            content = await resp.text()
            print(resp.url + ' → ' + content)
            json_save_path = os.path.join(temp_dir, str(int(time.time() * 1000)) + '.json')
            cp_utils.write_str_data(content, json_save_path)
            print("Save file:", json_save_path)
    return resp
Copy the code

⑨ Infinite scrolling

Async def fetch_data(page, url, timeout=60): # intercept request fetching await page. SetRequestInterception (True) page. On (' request, lambda the req: asyncio.ensure_future(intercept_request(req))) page.on('response', lambda resp: Asyncio. ensure_future(intercept_Response (resp)) print(" ", choose_group_name) await page.goto(url, options={'timeout': Int (timeout * 1000)}) # wait 3 seconds to load await asyncio.sleep(3) # scroll down await await page. Evaluate (" "async () => {await new Promise((resolve, reject) => {const distance = 100; Var totalHeight = 0; Var maxTries = 20000; var curTries = 0; var timer = setInterval(() => { var scrollHeight = document.body.scrollHeight; window.scrollBy(0, distance) totalHeight += distance console.log(totalHeight + "-" + scrollHeight) if (totalHeight >= scrollHeight) { if(curTries > maxTries) { clearInterval(timer) resolve(); } else { curTries += 1; totalHeight -= distance } } else { curTries = 0; }}, 100)})); } "") print(" {}") . The format (choose_group_name)) # cancels intercept await page. SetRequestInterception (False)Copy the code

Finally call:

if __name__ == '__main__':
    cur_browser = asyncio.get_event_loop().run_until_complete(init_browser())
    cur_page = asyncio.get_event_loop().run_until_complete(init_page(cur_browser))
    asyncio.get_event_loop().run_until_complete(login(cur_page))
Copy the code

After running, you can see the console output corresponding crawl information:

You can also see that you can crawl to a local JSON file:

Yo, the data is saved locally, and then the data processing


0x4 data processing

â‘  Key data extraction

Open a few json samples to see the key parts of json:

Define the extract entity class:

class Talk:
    def __init__(self, name=None, text=None, images=None, files=None) :
        self.name = name
        self.text = text
        self.images = images
        self.files = files
Copy the code

Loads are then iterated over the file, json.loads are changed to dict, loads are picked up on demand, it’s very simple:

import cp_utils
import json
import os

zsxq_save_dir = os.path.join(os.getcwd(), "zsxq")
result_json_path = os.path.join(os.getcwd(), "zsxq_result.json")
talk_list = []
talk_dict = {'data': None}


# Data entity
class Talk:
    def __init__(self, name=None, text=None, images=None, files=None) :
        self.name = name
        self.text = text
        self.images = images
        self.files = files

    def to_json_str(self) :
        return json.dumps({'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files},
                          ensure_ascii=False)

    def to_dict(self) :
        return {'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files}


Extract the content of the JSON file
def extract_json_file(file_path) :
    global talk_list
    content = cp_utils.read_content_from_file(file_path)
    content_dict = json.loads(content)
    topics = content_dict['resp_data'].get('topics')
    print("Parse file: {}".format(file_path))
    if topics is not None and len(topics) > 0:
        for topic in topics:
            talk_entity = Talk()
            talk = topic.get('talk')
            if talk is not None:
                Get name, text, image, file in sequence
                owner = talk.get('owner')
                if owner is not None:
                    owner_name = owner.get("name")
                    if owner is not None:
                        talk_entity.name = owner_name
                text = talk.get('text')
                if text is not None:
                    talk_entity.text = text
                images = talk.get('images')
                if images is not None and len(images) > 0:
                    image_urls = []
                    for image in images:
                        original = image.get('original')
                        if original is not None:
                            image_urls.append(original.get('url'))
                    talk_entity.images = image_urls
                files = talk.get('files')
                if files is not None and len(files) > 0:
                    file_list = []
                    for file in files:
                        file_id = file.get('file_id')
                        file_name = file.get('name')
                        file_list.append({file_id: file_name})
                    talk_entity.files = file_list
            talk_list.append(talk_entity.to_dict())
    else:
        print("Data is empty, skip file...")


if __name__ == '__main__':
    dir_list = cp_utils.fetch_all_file(zsxq_save_dir)
    print("Operable directory: \n")
    for index, path in enumerate(dir_list):
        print("{}, {}".format(index, path))
    choose_index = input("\n Please enter the directory number to be processed =>")
    choose_path = dir_list[int(choose_index)]
    print("Currently selected directory: {}".format(choose_path))
    json_file_list = cp_utils.filter_file_type(choose_path, '.json')
    for json_file in json_file_list[:10]:
        extract_json_file(json_file)
    talk_dict['data'] = talk_list
    talk_json = json.dumps(talk_dict, ensure_ascii=False, indent=2)
    cp_utils.write_str_data(talk_json, result_json_path)
    print("File written: {}".format(result_json_path))
Copy the code

Go through 10 files and try it:

Open the JSON file and see:

â‘¡ Convert json to Markdown

Json is definitely not easy to read. You can generate a Markdown, concatenated string. The main difficulty here is:

Text parsing, tags, plain text, external links, emoticons…

To replace a wave of tags and external links with re.sub() + backreference, write a test code to test the waters:

    # substitution re
    hash_tag_pattern = re.compile(r'(
      
        '
      )
    web_pattern = re.compile(r'(
      
        '
      )
   
    # Test cases
    xml_str = """ 
       
       """
    temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", xml_str), 'utf-8')
    temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
    temp_result = temp_result.strip().replace("\n"."")
    print(temp_result)
Copy the code

Take a look at the parsing results:

Good, then complete the picture and file related:

Convert to MD file
def json_to_md(file_path) :
    content = cp_utils.read_content_from_file(file_path)
    data_list = json.loads(content)['data']
    md_content = ' '
    for data in data_list:
        name = data['name']
        if name is not None:
            md_content += name + "\n"
        text = data['text']
        if text is not None:
            temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", text), 'utf-8').replace("#"."`")
            temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
            md_content += temp_result.strip()
        images = data['images']
        if images is not None:
            md_content += '\n'
            for image_url in images:
                img_file_name = str(int(time.time() * 1000)) + ".jpg"
                img_save_path = os.path.join(image_save_dir, str(int(time.time() * 1000)) + ".jpg")
                cp_utils.download_pic(img_save_path, image_url)
                relative_path = 'images/{}'.format(img_file_name)
                md_content += '! [] ({}) '.format(relative_path)
        files = data['files']
        if files is not None:
            md_content += '\n file: '
            for file in files:
                file_id = file.get('file_id')
                file_name = file.get('name')
                md_content += "{}".format(file_name)
        md_content += '\n\n---\n\n'
    cp_utils.write_str_data(md_content, result_md_path)
Copy the code

If you are careful, you may find that the image is downloaded in the code, and the remote image is not adopted. The reason is that the image resource URL of the site does not have the suffix name of the image, and Markdown syntax cannot recognize it, so the image cannot be displayed in the preview. Generated MD file:

PyCharm (46257 characters), PyCharm (46257 characters), MarkwonPAD2 (46257 characters


0 x 5, summary

In addition, there is only an ID in the file class here, the real download address has to call another interface to obtain, but also in the login state, interested students can try a wave.

Ok, that’s all. If you have any questions, welcome to point out in the comment section. Thank you ~


References:

  • Introduction to Pyppeteer and Chinese tutorial

  • Pyppetter – Your automated weapon!

  • Reptilian Series Pyppeteer: a new artifact in the reptilian world that is more efficient than Selenium

  • (latest version) the window. How to properly remove Pyppeteer navigator. Webdriver

  • Puppeteer Automated Test series 3 – puppeteer operations commonly used in end-to-end testing