This is the first day of my participation in the Gwen Challenge in November. Check out the details: the last Gwen Challenge in 2021
0 x1, introduction
Similarly, keep up with the section “Van came Python | a site course simple crawl, crawl is the cause of paid services will expire (root cause or poor man T T)
The technical solution is also: dot dot + capture package, this section by the way to try to use a long time automatic artifact → Pyppeteer
Solemnly declare:
This paper is only used to record the study of crawler technology, and will not provide direct crawler scripts. The crawler data has been deleted and not transmitted. Do not use it for illegal purposes. If any other illegal purposes cause losses, it is irrelevant to this article.
0x2, Pyppeteer
1. The origin of Puppeteer?
Puppeteer is an official Google NodeJS library that controls Headless Chrome through the DevTools protocol. Puppeteer provides an API that directly controls most user operations in Chrome, performs UI tests, and crawls page visits to collect data. Pyppeter can be understood as the Python version of Puppeteer.
2. Compared with Selenium?
Compared to the Selenium library, Pyppeteer does not require tedious environment configuration. When running for the first time, Pyppeteer will check whether it follows Chromium, and the uninstalled program will automatically install and configure it for us. And Pyppeteer is based on Python’s new async implementation (Python 3.5 and above), so some of its implementations also support asynchronous operations, which is much more efficient.
3. API documentation
- Official warehouse: github.com/pyppeteer/p…
- The official document: pyppeteer. Making. IO/pyppeteer/r…
- Official Documentation (English – Puppeteer) : github.com/zhaoqize/pu…
4. Puppeteer architecture diagram
Brief introduction (just know, don’t remember) :
- Puppeteer: communicates with the browser through the DevTools protocol.
- Browser: can hold the Browser context;
- BrowserContext: defines a browser session and can have multiple pages;
- Page: at least one frame, main frame;
- Frame: At least one execution context, the default execution context (the Frame’s JavaScript) is executed;
- Worker: with a single execution context, it is convenient to interact with WebWorkers;
5, Pyppeteer installation
- Step 1: Install pyppeteer for PIP
pip install pyppeteer
Copy the code
- Step 2: Install Chromium
Write a simple application using the Pyppeteer library, run it, it will automatically download, such as the script to generate a screenshot of the front page of gold digging:
import asyncio
from pyppeteer import launch
async def screen_shot() :
browser = await launch()
page = await browser.newPage()
await page.goto('https://juejin.cn/')
await page.screenshot({'path': 'juejin.jpg'})
await browser.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(screen_shot())
Copy the code
You can download the executablePath package from Taobao, decompress it, and specify the executablePath at launch() as follows:
From pyppeteer import chromium_downloader Win32 win64, Linux, MAC print (chromium_downloader. DownloadURLs. Get (" win64 ")) # run sample output: # https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip # 2) Storage.googleapis.com is replaced with taobao source npm.taobao.org/mirrors, for example: https://npm.taobao.org/mirrors/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip # can also into the site to choose: https://npm.taobao.org/mirrors/chromium-browser-snapshots # 3, launch specified when userDataDir await launch ({' headless ': headless,'args': launch_args, 'executablePath': './userData')Copy the code
Attached: code flow analysis
async Declare an asynchronous operation
await Declare a time-consuming operation
Create an asynchronous pool and execute screen_shot()
asyncio.get_event_loop().run_until_complete(screen_shot())
Create a browser object that can be passed as a dictionary parameter
browser = await launch()
Create a page object on which page operations are performed
page = await browser.newPage()
await page.goto('https://juejin.cn/') # page jump
await page.screenshot({'path': 'juejin.jpg'}) # Save screenshot
await browser.close() Close the browser object
Copy the code
That’s all you need to know about the API, check out the official documentation for more, or use search engines, and then analyze the crawl process.
0x3 data crawl
â‘ Access the login page
Login page: Not logged in → the following login panel will be displayed; logged in → the main page will be automatically redirected.
Crawl process
- Request the login page to determine whether there is a login TWO-DIMENSIONAL code node. If there is, sleep for 10s to wait for scanning code login.
- There is no QR code, it means to enter the home page;
- The above two steps enter the grouping to obtain;
â‘¡ Group acquisition
On the left panel you can see: Created/managed planets and added planets:
F12 look at the node:
Crawl process
- Through the selector to locate the two junction points, to obtain all the planet name and link, output for the user number to choose the planet to climb;
- The selector d
- Through the selector selector to locate the two nodes, get all the planet names and links, output for the user to select the planet to climb;
- Div. Created -group > div:nth-child(2) > a
â‘¢ Content crawl
At first I just wanted to climb down the elite classification, but later I found that some planets may be like this without data:
With Ajax, the easiest way to get data without trying to crack the rules of the interface is to simulate scrolling + parsing nodes.
But in this scenario, parsing nodes is too inefficient, there are too many tag + text + link combinations, too many parsing rules to write, and intercepting specific requests is more mindless and efficient.
Then take a look at the features of the Ajax request, such as the composition of the URL, which will be used for filtering the request later. Open the Network TAB, clear it, select XHR, scroll to the bottom of the page, and see the loaded request:
Open it, make sure it’s the required data, intercept the request, save the data locally, and finally batch it together.
â‘£ Two ways to determine when rolling stops
Keep scrolling down, need to determine when the data is finished, and stop scrolling. Here are two ideas:
- Asyncio.sleep () + pyppeteer to find the bottom node
This is an infinite loop that keeps checking to see if the bottom node is visible, so the site slides to the bottom:
<div _ngcontent-isv-c98="" class="no-more">Copy the code
- Method 2: JS timer + sliding distance and height judgment
Is to open open a timer, record the scrolling distance and the current page height, compare the former >= the latter, it may slide to the bottom.
Yes, it is possible, because there is no load on the list, so you can add a number of retries, only when the number of retries reaches the threshold.
I set the retry threshold to a large value, which is also considered as indirect hibernation.
⑤ Initialize the browser
Now that the process is clear, I can start writing code to implement the crawl, initializing the browser:
import asyncio
import os
import time
from pyppeteer import launch
import cp_utils
Start configuration parameters
launch_args = [
"--no-sandbox".# Non-sandbox mode
"--disable-infobars".# Hide the message bar
# set UA
"- the user-agent = Mozilla / 5.0 (Windows NT 10.0; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko)"
"Chrome / 83.0.4103.97 Safari / 537.36"."--log-level=3".# log level
]
# start browser
async def init_browser(headless=False) :
return await launch({'headless': headless,
'args': launch_args,
'userDataDir': './userData'.'dumpio': True.'ignoreHTTPSErrors ': True})
Copy the code
â‘¥ Creating a Page
Then create a new browser page with browser.newPage(), adding anti-Webdrivder detection in addition to the general Settings
# create a new page
async def init_page(browser) :
page = await browser.newPage()
await page.setViewport({'width': 1960.'height': 1080}) Set the page width and height
await page.setJavaScriptEnabled(True)
await prevent_web_driver_check(page)
return page
# Prevent WebDriver detection
async def prevent_web_driver_check(page) :
if page is not None:
# Hide webDriver features
await page.evaluateOnNewDocument("""() => { Object.defineProperty(navigator, 'webdriver', { get: () => undefined })} """)
Some sites will call JS to modify the result in order to detect the browser
await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; } ' ' ')
await page.evaluate(
'''() =>{ Object.defineProperty(navigator, 'lang uages', { get: () => ['en-US', 'en'] }); } ' ' ')
await page.evaluate(
"' () = > {Object. DefineProperty (the navigator," plugins ", {get: () = > [1, 2, 3, 4, 5, 6],}); } ' ' ')
Copy the code
⑦ List of landing and pulling planets
Use page.waitForSelector() to set timeout, check whether there is a login QR code node, execute the following logic:
Async def login(page, timeout=60): await page.goto(login_url, options={'timeout': int(timeout * 1000)}) try: Await page.waitForSelector('img.qrcode', {'visible': 'visible', 'timeout': 3000}) ) await asyncio.sleep(10) await fetch_group(page) except errors.TimeoutError: print(" detect login state, pull list..." Async def fetch_group(page) async def fetch_group(page): global choose_group_id, Group_list = [] created_groups = await page.jj ('div.created-group > div:nth-child(2) > a') joined_groups = await page.JJ('div.joined-group > div:nth-child(2) > a') for item in created_groups + joined_groups: group_name = await page.evaluate('item => item.textContent', item) group_url = await (await item.getProperty('href')).jsonValue() group_list.append([group_name.strip(), Group_url]) print(" for index, group in enumerate(group_list): Print (index, ', ', group) choose_group_index = input(" Choose_group = group_list[int(choose_group_index)] choose_group_id = choose_group[1].split('/')[-1] choose_group_name = choose_group[0] await fetch_data(page, choose_group[1])Copy the code
The running results are as follows:
⑧ Interception of requests, responses and data preservation
# request blocking
async def intercept_request(req) :
Disallow images, multimedia resources, and Websocket requests
if req.resourceType in ['image'.'media'.'eventsource'.'websocket'] :await req.abort()
else:
await req.continue_()
# intercept response
async def intercept_response(resp) :
resp_type = resp.request.resourceType
if resp_type in ['xhr'] and 'https://xxx/v2/groups/{}/topics? scope=all&count=20'.format(
choose_group_id) in resp.url:
content = await resp.text()
if len(content) > 0:
temp_dir = os.path.join(content_save_dir, choose_group_name)
cp_utils.is_dir_existed(temp_dir)
content = await resp.text()
print(resp.url + ' → ' + content)
json_save_path = os.path.join(temp_dir, str(int(time.time() * 1000)) + '.json')
cp_utils.write_str_data(content, json_save_path)
print("Save file:", json_save_path)
return resp
Copy the code
⑨ Infinite scrolling
Async def fetch_data(page, url, timeout=60): # intercept request fetching await page. SetRequestInterception (True) page. On (' request, lambda the req: asyncio.ensure_future(intercept_request(req))) page.on('response', lambda resp: Asyncio. ensure_future(intercept_Response (resp)) print(" ", choose_group_name) await page.goto(url, options={'timeout': Int (timeout * 1000)}) # wait 3 seconds to load await asyncio.sleep(3) # scroll down await await page. Evaluate (" "async () => {await new Promise((resolve, reject) => {const distance = 100; Var totalHeight = 0; Var maxTries = 20000; var curTries = 0; var timer = setInterval(() => { var scrollHeight = document.body.scrollHeight; window.scrollBy(0, distance) totalHeight += distance console.log(totalHeight + "-" + scrollHeight) if (totalHeight >= scrollHeight) { if(curTries > maxTries) { clearInterval(timer) resolve(); } else { curTries += 1; totalHeight -= distance } } else { curTries = 0; }}, 100)})); } "") print(" {}") . The format (choose_group_name)) # cancels intercept await page. SetRequestInterception (False)Copy the code
Finally call:
if __name__ == '__main__':
cur_browser = asyncio.get_event_loop().run_until_complete(init_browser())
cur_page = asyncio.get_event_loop().run_until_complete(init_page(cur_browser))
asyncio.get_event_loop().run_until_complete(login(cur_page))
Copy the code
After running, you can see the console output corresponding crawl information:
You can also see that you can crawl to a local JSON file:
Yo, the data is saved locally, and then the data processing
0x4 data processing
â‘ Key data extraction
Open a few json samples to see the key parts of json:
Define the extract entity class:
class Talk:
def __init__(self, name=None, text=None, images=None, files=None) :
self.name = name
self.text = text
self.images = images
self.files = files
Copy the code
Loads are then iterated over the file, json.loads are changed to dict, loads are picked up on demand, it’s very simple:
import cp_utils
import json
import os
zsxq_save_dir = os.path.join(os.getcwd(), "zsxq")
result_json_path = os.path.join(os.getcwd(), "zsxq_result.json")
talk_list = []
talk_dict = {'data': None}
# Data entity
class Talk:
def __init__(self, name=None, text=None, images=None, files=None) :
self.name = name
self.text = text
self.images = images
self.files = files
def to_json_str(self) :
return json.dumps({'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files},
ensure_ascii=False)
def to_dict(self) :
return {'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files}
Extract the content of the JSON file
def extract_json_file(file_path) :
global talk_list
content = cp_utils.read_content_from_file(file_path)
content_dict = json.loads(content)
topics = content_dict['resp_data'].get('topics')
print("Parse file: {}".format(file_path))
if topics is not None and len(topics) > 0:
for topic in topics:
talk_entity = Talk()
talk = topic.get('talk')
if talk is not None:
Get name, text, image, file in sequence
owner = talk.get('owner')
if owner is not None:
owner_name = owner.get("name")
if owner is not None:
talk_entity.name = owner_name
text = talk.get('text')
if text is not None:
talk_entity.text = text
images = talk.get('images')
if images is not None and len(images) > 0:
image_urls = []
for image in images:
original = image.get('original')
if original is not None:
image_urls.append(original.get('url'))
talk_entity.images = image_urls
files = talk.get('files')
if files is not None and len(files) > 0:
file_list = []
for file in files:
file_id = file.get('file_id')
file_name = file.get('name')
file_list.append({file_id: file_name})
talk_entity.files = file_list
talk_list.append(talk_entity.to_dict())
else:
print("Data is empty, skip file...")
if __name__ == '__main__':
dir_list = cp_utils.fetch_all_file(zsxq_save_dir)
print("Operable directory: \n")
for index, path in enumerate(dir_list):
print("{}, {}".format(index, path))
choose_index = input("\n Please enter the directory number to be processed =>")
choose_path = dir_list[int(choose_index)]
print("Currently selected directory: {}".format(choose_path))
json_file_list = cp_utils.filter_file_type(choose_path, '.json')
for json_file in json_file_list[:10]:
extract_json_file(json_file)
talk_dict['data'] = talk_list
talk_json = json.dumps(talk_dict, ensure_ascii=False, indent=2)
cp_utils.write_str_data(talk_json, result_json_path)
print("File written: {}".format(result_json_path))
Copy the code
Go through 10 files and try it:
Open the JSON file and see:
â‘¡ Convert json to Markdown
Json is definitely not easy to read. You can generate a Markdown, concatenated string. The main difficulty here is:
Text parsing, tags, plain text, external links, emoticons…
To replace a wave of tags and external links with re.sub() + backreference, write a test code to test the waters:
# substitution re
hash_tag_pattern = re.compile(r'(
'
)
web_pattern = re.compile(r'(
'
)
# Test cases
xml_str = """
"""
temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", xml_str), 'utf-8')
temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
temp_result = temp_result.strip().replace("\n"."")
print(temp_result)
Copy the code
Take a look at the parsing results:
Good, then complete the picture and file related:
Convert to MD file
def json_to_md(file_path) :
content = cp_utils.read_content_from_file(file_path)
data_list = json.loads(content)['data']
md_content = ' '
for data in data_list:
name = data['name']
if name is not None:
md_content += name + "\n"
text = data['text']
if text is not None:
temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", text), 'utf-8').replace("#"."`")
temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
md_content += temp_result.strip()
images = data['images']
if images is not None:
md_content += '\n'
for image_url in images:
img_file_name = str(int(time.time() * 1000)) + ".jpg"
img_save_path = os.path.join(image_save_dir, str(int(time.time() * 1000)) + ".jpg")
cp_utils.download_pic(img_save_path, image_url)
relative_path = 'images/{}'.format(img_file_name)
md_content += '! [] ({}) '.format(relative_path)
files = data['files']
if files is not None:
md_content += '\n file: '
for file in files:
file_id = file.get('file_id')
file_name = file.get('name')
md_content += "{}".format(file_name)
md_content += '\n\n---\n\n'
cp_utils.write_str_data(md_content, result_md_path)
Copy the code
If you are careful, you may find that the image is downloaded in the code, and the remote image is not adopted. The reason is that the image resource URL of the site does not have the suffix name of the image, and Markdown syntax cannot recognize it, so the image cannot be displayed in the preview. Generated MD file:
PyCharm (46257 characters), PyCharm (46257 characters), MarkwonPAD2 (46257 characters
0 x 5, summary
In addition, there is only an ID in the file class here, the real download address has to call another interface to obtain, but also in the login state, interested students can try a wave.
Ok, that’s all. If you have any questions, welcome to point out in the comment section. Thank you ~
References:
-
Introduction to Pyppeteer and Chinese tutorial
-
Pyppetter – Your automated weapon!
-
Reptilian Series Pyppeteer: a new artifact in the reptilian world that is more efficient than Selenium
-
(latest version) the window. How to properly remove Pyppeteer navigator. Webdriver
-
Puppeteer Automated Test series 3 – puppeteer operations commonly used in end-to-end testing