Chapter 1 Introduction of crawler
1. Get to know the reptile
Chapter 2: Requests Hands-on (Basic Crawler)
1. Douban Movie crawl
2. KFC restaurant query
3. Break Baidu Translation
Sogou home page
5. Web collector
6. Relevant data of China Food and Drug Administration
Personal public account YK Kundi
Background restore scrapy get collation resources
Chapter 3: Crawler Data Analysis (BS4,xpath, Regular Expression)
1. Bs4 analytical basis
2. Bs4 case
3. Xpath parsing basics
4. Xpath parsing case – 4K picture parsing crawl
5. Xpath parsing case -58 second-hand house
6. Xpath parsing examples – crawl free resume templates from webmaster materials
7. Xpath parsing case – National city name crawl
8. Regular parsing
9. Regular resolution-paging crawl
10. Crawl pictures
Chapter 4: Automatic identification of captcha
1. The gushiwen verification code identifies fateadm_api.py(it is recommended to locate the required configuration in the same folder) and invokes the API interface
Chapter 5: Advanced Request Module (Simulated Login)
1. Proxy operation 2. Simulated login of Renren 3
Chapter 6: High-performance Asynchronous crawlers (Thread Pools, Coroutines)
1. Aiohttp implements multi-task asynchronous crawler
2. The flask
3. Multitasking coroutines
4. Multi-task asynchronous crawler
Example 5.
6. Synchronous crawler
7. Basic use of thread pools
8. Application of thread pool in crawler case
9. Coroutines
Chapter 7: Dynamic Loading data Processing (Selenium Module Application, Simulated login 12306)
Personal public account YK Kundi
Selenium selenium selenium
2. Selenium other automatic operations
3.12306 Example login code
4. Processing of action chain and IFrame
Google headless browser + reverse detection
6. Implement 1236 simulated login based on Selenium
7. Simulate logging in to Qzone
Chapter 8: Scrapy Framing
1. All kinds of project combat, scrapy configuration modification
2. BossPro example 3. BossPro example 4
Chapter 1 Introduction of crawler
Level 0: Get to know the crawler
1. Initial crawler
Crawlers, essentially, use programs to retrieve valuable data from the web.
2. Clear path
2-1. Working principle of browser
(1) Data parsing: When the server responds to the data to the browser, the browser will not directly throw the data to us. Because the data is written in the computer’s language, the browser has to translate it into something we can understand;
(2) Data extraction: we can pick out useful data from the data we get;
(3) Data storage: the selected useful data is saved in a file/database.
2-2. Working principle of crawler
(1) Data acquisition: the crawler will initiate a request to the server according to the URL provided by us, and then return data;
(2) Data parsing: the crawler will parse the data returned by the server into a format that we can understand;
(3) Data extraction: the crawler extracts the data we need from it;
(4) Store data: The crawler saves these useful data for your later use and analysis.
———————————————— Copyright notice: This article is originally published BY CSDN blogger “YK Kundi” under CC 4.0 BY-SA copyright agreement. Please attach the link of the original source and this statement. Original link: blog.csdn.net/qq_45803923…
Chapter 2: Requests Hands-on (Basic Crawler)
1. Douban Movie crawl
Scrapy scrapy scrapy scrapy scrapyimport requests
import json
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = "https://movie.douban.com/j/chart/top_list"
params = {
'type': '24'.'interval_id': '100:90'.'action': ' '.'start': '0'.# Start with the number of movies
'limit': '20'# Number of movies retrieved at one time
}
response = requests.get(url,params = params,headers = headers)
list_data = response.json()
fp = open('douban.json'.'w',encoding= 'utf-8')
json.dump(list_data,fp = fp,ensure_ascii= False)
print('over!!!! ')
Copy the code
2. KFC restaurant query
Scrapy scrapy scrapy scrapy scrapyimport requests
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'http://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword'
word = input('Please enter an address:')
params = {
'cname': ' '.'pid': ' '.'keyword': word,
'pageIndex': '1'.'pageSize': '10'
}
response = requests.post(url,params = params ,headers = headers)
page_text = response.text
fileName = word + '.txt'
with open(fileName,'w',encoding= 'utf-8') as f:
f.write(page_text)
Copy the code
3. Break Baidu Translation
Scrapy scrapy scrapy scrapy scrapyimport requests
import json
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
post_url = 'https://fanyi.baidu.com/sug'
word = input('enter a word:')
data = {
'kw':word } response = requests.post(url = post_url,data = data,headers = headers) dic_obj = response.json() fileName = word +'.json'
fp = open(fileName,'w',encoding= 'utf-8')
#ensure_ascii = False
json.dump(dic_obj,fp = fp,ensure_ascii = False)
print('over! ')
Copy the code
Sogou home page
Scrapy scrapy scrapy scrapy scrapyimport requests
url = 'https://www.sogou.com/?pid=sogou-site-d5da28d4865fb927'
response = requests.get(url)
page_text = response.text
print(page_text)
with open('./sougou.html'.'w',encoding= 'utf-8') as fp:
fp.write(page_text)
print('Data crawl finished!! ')
Copy the code
5. Web collector
Scrapy scrapy scrapy scrapy scrapyimport requests
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://www.sogou.com/sogou'
kw = input('enter a word:')
param = {
'query':kw
}
response = requests.get(url,params = param,headers = headers)
page_text = response.text
fileName = kw +'.html'
with open(fileName,'w',encoding= 'utf-8') as fp:
fp.write(page_text)
print(fileName,'Save successful!! ')
Copy the code
6. Relevant data of China Food and Drug Administration
Scrapy scrapy scrapy scrapy scrapyimport requests
import json
url = "http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsList"
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4385.0 Safari/537.36'
}
for page in range(1.6):
page = str(page)
data = {
'on': 'true'.'page': page,
'pageSize': '15'.'productName':' '.'conditionType': '1'.'applyname': ' '.'applysn':' '
}
json_ids = requests.post(url,data = data,headers = headers).json()
id_list = []
for dic in json_ids['list']:
id_list.append(dic['ID'])
#print(id_list)
post_url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsById'
all_data_list = []
for id in id_list:
data = {
'id':id
}
datail_json = requests.post(url = post_url,data = data,headers = headers).json()
#print(datail_json,'---------------------over')
all_data_list.append(datail_json)
fp = open('allData.json'.'w',encoding='utf-8')
json.dump(all_data_list,fp = fp,ensure_ascii= False)
print('over!!! ')
Copy the code
Chapter 3: Crawler Data Analysis (BS4,xpath, Regular Expression)
1. Bs4 analytical basis
from bs4 import BeautifulSoup
fp = open('Chapter 3 Data Analysis /text.html'.'r',encoding='utf-8')
soup = BeautifulSoup(fp,'lxml')
#print(soup)
#print(soup.a)
#print(soup.div)
#print(soup.find('div'))
#print(soup.find('div',class_="song"))
#print(soup.find_all('a'))
#print(soup.select('.tang'))
#print(soup.select('.tang > ul > li >a')[0].text)
#print(soup.find('div',class_="song").text)
#print(soup.find('div',class_="song").string)
print(soup.select('.tang > ul > li >a') [0] ['href'])
Copy the code
2. Bs4 case
from bs4 import BeautifulSoup
import requests
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = "http://sanguo.5000yan.com/"
page_text = requests.get(url ,headers = headers).content
#print(page_text)
soup = BeautifulSoup(page_text,'lxml')
li_list = soup.select('.list > ul > li')
fp = open('./sanguo.txt'.'w',encoding='utf-8')
for li in li_list:
title = li.a.string
#print(title)
detail_url = 'http://sanguo.5000yan.com/'+li.a['href']
print(detail_url)
detail_page_text = requests.get(detail_url,headers = headers).content
detail_soup = BeautifulSoup(detail_page_text,'lxml')
div_tag = detail_soup.find('div',class_="grap")
content = div_tag.text
fp.write(title+":"+content+'\n')
print(title,'Climb successful!! ')
Copy the code
3. Xpath parsing basics
from lxml import etree
tree = etree.parse('Chapter 3 Data Analysis /text.html')
# r = tree.xpath('/html/head/title')
# print(r)
# r = tree.xpath('/html/body/div')
# print(r)
# r = tree.xpath('/html//div')
# print(r)
# r = tree.xpath('//div')
# print(r)
# r = tree.xpath('//div[@class="song"]')
# print(r)
# r = tree.xpath('//div[@class="song"]/P[3]')
# print(r)
# r = tree.xpath('//div[@class="tang"]//li[5]/a/text()')
# print(r)
# r = tree.xpath('//li[7]/i/text()')
# print(r)
# r = tree.xpath('//li[7]//text()')
# print(r)
# r = tree.xpath('//div[@class="tang"]//text()')
# print(r)
# r = tree.xpath('//div[@class="song"]/img/@src')
# print(r)
Copy the code
4. Xpath parsing case – 4K picture parsing crawl
import requests
from lxml import etree
import os
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'http://pic.netbian.com/4kmeinv/'
response = requests.get(url,headers = headers)
#response.encoding=response.apparent_encoding
#response.encoding = 'utf-8'
page_text = response.text
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@class="slist"]/ul/li')
# if not os.path.exists('./picLibs'):
# os.mkdir('./picLibs')
for li in li_list:
img_src = 'http://pic.netbian.com/'+li.xpath('./a/img/@src') [0]
img_name = li.xpath('./a/img/@alt') [0] +'.jpg'
img_name = img_name.encode('iso-8859-1').decode('gbk')
# print(img_name,img_src)
# print(type(img_name))
img_data = requests.get(url = img_src,headers = headers).content
img_path ='picLibs/'+img_name
#print(img_path)
with open(img_path,'wb') as fp:
fp.write(img_data)
print(img_name,"Download successful")
Copy the code
5. Xpath parsing case -58 second-hand house
import requests
from lxml import etree
url = 'https://bj.58.com/ershoufang/p2/'
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
page_text = requests.get(url=url,headers = headers).text
tree = etree.HTML(page_text)
li_list = tree.xpath('//section[@class="list-left"]/section[2]/div')
fp = open('58.txt'.'w',encoding='utf-8')
for li in li_list:
title = li.xpath('./a/div[2]/div/div/h3/text()') [0]
print(title)
fp.write(title+'\n')
Copy the code
6. Xpath parsing examples – crawl free resume templates from webmaster materials
Scrapy scrapy scrapy scrapy scrapyimport requests
from lxml import etree
import os
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://www.aqistudy.cn/historydata/'
page_text = requests.get(url,headers = headers).text
Copy the code
7. Xpath parsing case – National city name crawl
import requests
from lxml import etree
import os
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://www.aqistudy.cn/historydata/'
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
# holt_li_list = tree.xpath('//div[@class="bottom"]/ul/li')
# all_city_name = []
# for li in holt_li_list:
# host_city_name = li.xpath('./a/text()')[0]
# all_city_name.append(host_city_name)
# city_name_list = tree.xpath('//div[@class="bottom"]/ul/div[2]/li')
# for li in city_name_list:
# city_name = li.xpath('./a/text()')[0]
# all_city_name.append(city_name)
# print(all_city_name,len(all_city_name))
#holt_li_list = tree.xpath('//div[@class="bottom"]/ul//li')
holt_li_list = tree.xpath('//div[@class="bottom"]/ul/li | //div[@class="bottom"]/ul/div[2]/li')
all_city_name = []
for li in holt_li_list:
host_city_name = li.xpath('./a/text()') [0]
all_city_name.append(host_city_name)
print(all_city_name,len(all_city_name))
Copy the code
8. Regular parsing
import requests
import re
import os
if not os.path.exists('./qiutuLibs'):
os.mkdir('./qiutuLibs')
url = 'https://www.qiushibaike.com/imgrank/'
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4385.0 Safari/537.36'
}
page_text = requests.get(url,headers = headers).text
ex = '
.*?
'
img_src_list = re.findall(ex,page_text,re.S)
print(img_src_list)
for src in img_src_list:
src = 'https:' + src
img_data = requests.get(url = src,headers = headers).content
img_name = src.split('/')[-1]
imgPath = './qiutuLibs/'+img_name
with open(imgPath,'wb') as fp:
fp.write(img_data)
print(img_name,"Download completed !!!!!")
Copy the code
9. Regular resolution-paging crawl
import requests
import re
import os
if not os.path.exists('./qiutuLibs'):
os.mkdir('./qiutuLibs')
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4385.0 Safari/537.36'
}
url = 'https://www.qiushibaike.com/imgrank/page/%d/'
for pageNum in range(1.3):
new_url = format(url%pageNum)
page_text = requests.get(new_url,headers = headers).text
ex = '
.*?
'
img_src_list = re.findall(ex,page_text,re.S)
print(img_src_list)
for src in img_src_list:
src = 'https:' + src
img_data = requests.get(url = src,headers = headers).content
img_name = src.split('/')[-1]
imgPath = './qiutuLibs/'+img_name
with open(imgPath,'wb') as fp:
fp.write(img_data)
print(img_name,"Download completed !!!!!")
Copy the code
10. Crawl pictures
import requests
url = 'https://pic.qiushibaike.com/system/pictures/12404/124047919/medium/R7Y2UOCDRBXF2MIQ.jpg'
img_data = requests.get(url).content
with open('qiutu.jpg'.'wb') as fp:
fp.write(img_data)
Copy the code
Chapter 4: Automatic identification of captcha
1. Identification of gushiwen verification code
Developer account password can be applied for
Scrapy scrapy scrapy scrapy scrapyimport requests
from lxml import etree
from fateadm_api import FateadmApi
def TestFunc(imgPath,codyType) :
pd_id = "xxxxxx" Pd information can be queried on the user center page
pd_key = "xxxxxxxx"
app_id = "xxxxxxx" The account for developer sharing can be found in the Developer Center
app_key = "xxxxxxx"
# identify the type,
For specific types, you can check the price page of the official website to select specific types. If you don't know the type, you can consult customer service
pred_type = codyType
api = FateadmApi(app_id, app_key, pd_id, pd_key)
# check balance
balance = api.QueryBalcExtend() # Direct refunds
# api.QueryBalc()
# Identify by file form:
file_name = imgPath
# more site types, need to increase the src_url parameters, specific please refer to the API documentation: http://docs.fateadm.com/web/#/1?page_id=6
result = api.PredictFromFileExtend(pred_type,file_name) # Return the recognition result directly
# RSP = api.predictFromFile (pred_type, file_name
'' # Predict interface is called if not identified by file: RSP = api.predictextend (pred_type,data) # PredictExtend(pred_type,data) # PredictExtend(pred_type,data)
# just_flag = False
# if just_flag :
# if rsp.ret_code == 0:
If the identified result does not match the expected result, this interface can be called to refund the expected inconsistent order
# # Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
# api.Justice( rsp.request_id)
#card_id = "123"
#card_key = "123"
# top-up
#api.Charge(card_id, card_key)
#LOG("print in testfunc")
return result
# if __name__ == "__main__":
# TestFunc()
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://so.gushiwen.cn/user/login.aspx?from=http://so.gushiwen.cn/user/collect.aspx'
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
code_img_src = 'https://so.gushiwen.cn' + tree.xpath('//*[@id="imgCode"]/@src') [0]
img_data = requests.get(code_img_src,headers = headers).content
with open('./code.jpg'.'wb') as fp:
fp.write(img_data)
code_text = TestFunc('code.jpg'.30400)
print('Identification result is :' + code_text)
code_text = TestFunc('code.jpg'.30400)
print('Identification result is :' + code_text)
Copy the code
Fateadm_api.py (identify required configuration, recommend to put in the same folder) calls the API interface
# coding=utf-8
import os,sys
import hashlib
import time
import json
import requests
FATEA_PRED_URL = "http://pred.fateadm.com"
def LOG(log) :
When no tests are needed, comment out the log
print(log)
log = None
class TmpObj() :
def __init__(self) :
self.value = None
class Rsp() :
def __init__(self) :
self.ret_code = -1
self.cust_val = 0.0
self.err_msg = "succ"
self.pred_rsp = TmpObj()
def ParseJsonRsp(self, rsp_data) :
if rsp_data is None:
self.err_msg = "http request failed, get rsp Nil data"
return
jrsp = json.loads( rsp_data)
self.ret_code = int(jrsp["RetCode"])
self.err_msg = jrsp["ErrMsg"]
self.request_id = jrsp["RequestId"]
if self.ret_code == 0:
rslt_data = jrsp["RspData"]
if rslt_data is not None andrslt_data ! ="":
jrsp_ext = json.loads( rslt_data)
if "cust_val" in jrsp_ext:
data = jrsp_ext["cust_val"]
self.cust_val = float(data)
if "result" in jrsp_ext:
data = jrsp_ext["result"]
self.pred_rsp.value = data
def CalcSign(pd_id, passwd, timestamp) :
md5 = hashlib.md5()
md5.update((timestamp + passwd).encode())
csign = md5.hexdigest()
md5 = hashlib.md5()
md5.update((pd_id + timestamp + csign).encode())
csign = md5.hexdigest()
return csign
def CalcCardSign(cardid, cardkey, timestamp, passwd) :
md5 = hashlib.md5()
md5.update(passwd + timestamp + cardid + cardkey)
return md5.hexdigest()
def HttpRequest(url, body_data, img_data="") :
rsp = Rsp()
post_data = body_data
files = {
'img_data': ('img_data',img_data)
}
header = {
'User-Agent': 'the Mozilla / 5.0',
}
rsp_data = requests.post(url, post_data,files=files ,headers=header)
rsp.ParseJsonRsp( rsp_data.text)
return rsp
class FateadmApi() :
The API interface calls the class
Parameters (appID, appKey, pdID, pdKey)
def __init__(self, app_id, app_key, pd_id, pd_key) :
self.app_id = app_id
if app_id is None:
self.app_id = ""
self.app_key = app_key
self.pd_id = pd_id
self.pd_key = pd_key
self.host = FATEA_PRED_URL
def SetHost(self, url) :
self.host = url
#
# check balance
# Parameter: none
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.CUST_VAL: user balance
# rsp.err_msg: Returns exception details when an exception occurs
#
def QueryBalc(self) :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign
}
url = self.host + "/api/custval"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("query succ ret: {} cust_val: {} rsp: {} pred: {}".format( rsp.ret_code, rsp.cust_val, rsp.err_msg, rsp.pred_rsp.value))
else:
LOG("query failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
#
Query network latency
# Parameter: pred_type: identifies the type
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.err_msg: Returns exception details when an exception occurs
#
def QueryTTS(self, pred_type) :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign,
"predict_type":pred_type,
}
ifself.app_id ! ="":
#
asign = CalcSign(self.app_id, self.app_key, tm)
param["appid"] = self.app_id
param["asign"] = asign
url = self.host + "/api/qcrtt"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("query rtt succ ret: {} request_id: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.err_msg))
else:
LOG("predict failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
#
# Identify the verification code
# parameter: pred_type: recognition type img_data: image data
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.request_id: unique order number
# rsp.pred_rsp.value: identifies the result
# rsp.err_msg: Returns exception details when an exception occurs
#
def Predict(self, pred_type, img_data, head_info = "") :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp": tm,
"sign": sign,
"predict_type": pred_type,
"up_type": "mt"
}
if head_info is not None orhead_info ! ="":
param["head_info"] = head_info
ifself.app_id ! ="":
#
asign = CalcSign(self.app_id, self.app_key, tm)
param["appid"] = self.app_id
param["asign"] = asign
url = self.host + "/api/capreg"
files = img_data
rsp = HttpRequest(url, param, files)
if rsp.ret_code == 0:
LOG("predict succ ret: {} request_id: {} pred: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.pred_rsp.value, rsp.err_msg))
else:
LOG("predict failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg))
if rsp.ret_code == 4003:
#lack of money
LOG("cust_val <= 0 lack of money, please charge immediately")
return rsp
#
# Identify captcha from file
# parameter: pred_type; Identification type file_name: indicates the file name
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.request_id: unique order number
# rsp.pred_rsp.value: identifies the result
# rsp.err_msg: Returns exception details when an exception occurs
#
def PredictFromFile( self, pred_type, file_name, head_info = "") :
with open(file_name, "rb") as f:
data = f.read()
return self.Predict(pred_type,data,head_info=head_info)
#
# identify failure and request a refund
# parameter: request_id: the order number for which the refund is required
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.err_msg: Returns exception details when an exception occurs
#
# note:
# Predict identifies the interface, and only when ret_code == 0 will the deduction be made and the refund will be requested. Otherwise, no refund will be required
# note 2:
# Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
#
def Justice(self, request_id) :
if request_id == "":
#
return
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign,
"request_id":request_id
}
url = self.host + "/api/capjust"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("justice succ ret: {} request_id: {} pred: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.pred_rsp.value, rsp.err_msg))
else:
LOG("justice failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
#
# Recharge interface
Cardkey: signature string of the recharge card
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.err_msg: Returns exception details when an exception occurs
#
def Charge(self, cardid, cardkey) :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
csign = CalcCardSign(cardid, cardkey, tm, self.pd_key)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign,
'cardid':cardid,
'csign':csign
}
url = self.host + "/api/charge"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("charge succ ret: {} request_id: {} pred: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.pred_rsp.value, rsp.err_msg))
else:
LOG("charge failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
# #
# rechargeable, return only success or not
Cardkey: signature string of the recharge card
# return value: 0 is returned on success
# #
def ExtendCharge(self, cardid, cardkey) :
return self.Charge(cardid,cardkey).ret_code
# #
# call a refund, only return success or failure
# parameter: request_id: the order number for which the refund is required
# return value: 0 is returned on successful refund
#
# note:
# Predict identifies the interface, and only when ret_code == 0 will the deduction be made and the refund will be requested. Otherwise, no refund will be required
# note 2:
# Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
# #
def JusticeExtend(self, request_id) :
return self.Justice(request_id).ret_code
# #
Select * from balance
# Parameter: none
# Return value: Rsp.CUST_VAL: balance
# #
def QueryBalcExtend(self) :
rsp = self.QueryBalc()
return rsp.cust_val
# #
# From the file recognition code, only the recognition result is returned
# parameter: pred_type; Identification type file_name: indicates the file name
Rsp. pred_rsp.value: the result of recognition
# #
def PredictFromFileExtend( self, pred_type, file_name, head_info = "") :
rsp = self.PredictFromFile(pred_type,file_name,head_info)
return rsp.pred_rsp.value
# #
# identify the interface, only return the recognition result
# parameter: pred_type: recognition type img_data: image data
Rsp. pred_rsp.value: the result of recognition
# #
def PredictExtend(self,pred_type, img_data, head_info = "") :
rsp = self.Predict(pred_type,img_data,head_info)
return rsp.pred_rsp.value
def TestFunc() :
pd_id = "128292" Pd information can be queried on the user center page
pd_key = "bASHdc/12ISJOX7pV3qhPr2ntQ6QcEkV"
app_id = "100001" The account for developer sharing can be found in the Developer Center
app_key = "123456"
# identify the type,
For specific types, you can check the price page of the official website to select specific types. If you don't know the type, you can consult customer service
pred_type = "30400"
api = FateadmApi(app_id, app_key, pd_id, pd_key)
# check balance
balance = api.QueryBalcExtend() # Direct refunds
# api.QueryBalc()
# Identify by file form:
file_name = 'img.gif'
# more site types, need to increase the src_url parameters, specific please refer to the API documentation: http://docs.fateadm.com/web/#/1?page_id=6
# result = API. PredictFromFileExtend (pred_type, file_name) # returned directly identification results
rsp = api.PredictFromFile(pred_type, file_name) # return the detailed identification result
'' # Predict interface is called if not identified by file: RSP = api.predictextend (pred_type,data) # PredictExtend(pred_type,data) # PredictExtend(pred_type,data)
just_flag = False
if just_flag :
if rsp.ret_code == 0:
If the result is not expected, you can call this interface to refund the expected non-conforming order
# Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
api.Justice( rsp.request_id)
#card_id = "123"
#card_key = "123"
# top-up
#api.Charge(card_id, card_key)
LOG("print in testfunc")
if __name__ == "__main__":
TestFunc()
Copy the code
Chapter 5: Advanced Request Module (Simulated Login)
1. Proxy operations
Scrapy scrapy scrapy scrapy scrapyimport requests
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://www.sogou.com/sie?query=ip'
page_text = requests.get(url,headers = headers,proxies = {"https":"183.166.103.86:9999"}).text
with open('ip.html'.'w',encoding='utf-8') as fp:
fp.write(page_text)
Copy the code
2. Simulated login on Renren
import requests
from lxml import etree
from fateadm_api import FateadmApi
def TestFunc(imgPath,codyType) :
pd_id = "xxxxx" Pd information can be queried on the user center page
pd_key = "xxxxxxxxxxxxxxxxxx"
app_id = "xxxxxxxx" The account for developer sharing can be found in the Developer Center
app_key = "xxxxxx"
# identify the type,
For specific types, you can check the price page of the official website to select specific types. If you don't know the type, you can consult customer service
pred_type = codyType
api = FateadmApi(app_id, app_key, pd_id, pd_key)
# check balance
balance = api.QueryBalcExtend() # Direct refunds
# api.QueryBalc()
# Identify by file form:
file_name = imgPath
# more site types, need to increase the src_url parameters, specific please refer to the API documentation: http://docs.fateadm.com/web/#/1?page_id=6
result = api.PredictFromFileExtend(pred_type,file_name) # Return the recognition result directly
# RSP = api.predictFromFile (pred_type, file_name
'' # Predict interface is called if not identified by file: RSP = api.predictextend (pred_type,data) # PredictExtend(pred_type,data) # PredictExtend(pred_type,data)
# just_flag = False
# if just_flag :
# if rsp.ret_code == 0:
If the identified result does not match the expected result, this interface can be called to refund the expected inconsistent order
# # Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
# api.Justice( rsp.request_id)
#card_id = "123"
#card_key = "123"
# top-up
#api.Charge(card_id, card_key)
#LOG("print in testfunc")
return result
# if __name__ == "__main__":
# TestFunc()
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'http://www.renren.com/'
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
code_img_src = tree.xpath('//*[@id="verifyPic_login"]/@src') [0]
code_img_data = requests.get(code_img_src,headers = headers).content
with open('./code.jpg'.'wb') as fp:
fp.write(code_img_data)
result = TestFunc('code.jpg'.30600)
print('Identification result is :' + result)
login_url = 'http://www.renren.com/ajaxLogin/login?1=1&uniqueTimestamp=2021121720536'
data = {
'email':'xxxxxxxx'.'icode': result,
'origURL': 'http://www.renren.com/home'.'domain': 'renren.com'.'key_id': '1'.'captcha_type':' web_login'.'password': '47e27dd5ef32b31041ebf56ec85a9b1e4233875e36396241c88245b188c56cdb'.'rkey': 'c655ef0c57a72755f1240d6c0efac67d'.'f': ' '
}
response = requests.post(login_url,headers = headers, data = data)
print(response.status_code)
with open('renren.html'.'w',encoding= 'utf-8') as fp:
fp.write(response.text)
Copy the code
fateadm_api.py
# coding=utf-8
import os,sys
import hashlib
import time
import json
import requests
FATEA_PRED_URL = "http://pred.fateadm.com"
def LOG(log) :
When no tests are needed, comment out the log
print(log)
log = None
class TmpObj() :
def __init__(self) :
self.value = None
class Rsp() :
def __init__(self) :
self.ret_code = -1
self.cust_val = 0.0
self.err_msg = "succ"
self.pred_rsp = TmpObj()
def ParseJsonRsp(self, rsp_data) :
if rsp_data is None:
self.err_msg = "http request failed, get rsp Nil data"
return
jrsp = json.loads( rsp_data)
self.ret_code = int(jrsp["RetCode"])
self.err_msg = jrsp["ErrMsg"]
self.request_id = jrsp["RequestId"]
if self.ret_code == 0:
rslt_data = jrsp["RspData"]
if rslt_data is not None andrslt_data ! ="":
jrsp_ext = json.loads( rslt_data)
if "cust_val" in jrsp_ext:
data = jrsp_ext["cust_val"]
self.cust_val = float(data)
if "result" in jrsp_ext:
data = jrsp_ext["result"]
self.pred_rsp.value = data
def CalcSign(pd_id, passwd, timestamp) :
md5 = hashlib.md5()
md5.update((timestamp + passwd).encode())
csign = md5.hexdigest()
md5 = hashlib.md5()
md5.update((pd_id + timestamp + csign).encode())
csign = md5.hexdigest()
return csign
def CalcCardSign(cardid, cardkey, timestamp, passwd) :
md5 = hashlib.md5()
md5.update(passwd + timestamp + cardid + cardkey)
return md5.hexdigest()
def HttpRequest(url, body_data, img_data="") :
rsp = Rsp()
post_data = body_data
files = {
'img_data': ('img_data',img_data)
}
header = {
'User-Agent': 'the Mozilla / 5.0',
}
rsp_data = requests.post(url, post_data,files=files ,headers=header)
rsp.ParseJsonRsp( rsp_data.text)
return rsp
class FateadmApi() :
The API interface calls the class
Parameters (appID, appKey, pdID, pdKey)
def __init__(self, app_id, app_key, pd_id, pd_key) :
self.app_id = app_id
if app_id is None:
self.app_id = ""
self.app_key = app_key
self.pd_id = pd_id
self.pd_key = pd_key
self.host = FATEA_PRED_URL
def SetHost(self, url) :
self.host = url
#
# check balance
# Parameter: none
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.CUST_VAL: user balance
# rsp.err_msg: Returns exception details when an exception occurs
#
def QueryBalc(self) :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign
}
url = self.host + "/api/custval"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("query succ ret: {} cust_val: {} rsp: {} pred: {}".format( rsp.ret_code, rsp.cust_val, rsp.err_msg, rsp.pred_rsp.value))
else:
LOG("query failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
#
Query network latency
# Parameter: pred_type: identifies the type
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.err_msg: Returns exception details when an exception occurs
#
def QueryTTS(self, pred_type) :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign,
"predict_type":pred_type,
}
ifself.app_id ! ="":
#
asign = CalcSign(self.app_id, self.app_key, tm)
param["appid"] = self.app_id
param["asign"] = asign
url = self.host + "/api/qcrtt"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("query rtt succ ret: {} request_id: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.err_msg))
else:
LOG("predict failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
#
# Identify the verification code
# parameter: pred_type: recognition type img_data: image data
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.request_id: unique order number
# rsp.pred_rsp.value: identifies the result
# rsp.err_msg: Returns exception details when an exception occurs
#
def Predict(self, pred_type, img_data, head_info = "") :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp": tm,
"sign": sign,
"predict_type": pred_type,
"up_type": "mt"
}
if head_info is not None orhead_info ! ="":
param["head_info"] = head_info
ifself.app_id ! ="":
#
asign = CalcSign(self.app_id, self.app_key, tm)
param["appid"] = self.app_id
param["asign"] = asign
url = self.host + "/api/capreg"
files = img_data
rsp = HttpRequest(url, param, files)
if rsp.ret_code == 0:
LOG("predict succ ret: {} request_id: {} pred: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.pred_rsp.value, rsp.err_msg))
else:
LOG("predict failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg))
if rsp.ret_code == 4003:
#lack of money
LOG("cust_val <= 0 lack of money, please charge immediately")
return rsp
#
# Identify captcha from file
# parameter: pred_type; Identification type file_name: indicates the file name
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.request_id: unique order number
# rsp.pred_rsp.value: identifies the result
# rsp.err_msg: Returns exception details when an exception occurs
#
def PredictFromFile( self, pred_type, file_name, head_info = "") :
with open(file_name, "rb") as f:
data = f.read()
return self.Predict(pred_type,data,head_info=head_info)
#
# identify failure and request a refund
# parameter: request_id: the order number for which the refund is required
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.err_msg: Returns exception details when an exception occurs
#
# note:
# Predict identifies the interface, and only when ret_code == 0 will the deduction be made and the refund will be requested. Otherwise, no refund will be required
# note 2:
# Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
#
def Justice(self, request_id) :
if request_id == "":
#
return
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign,
"request_id":request_id
}
url = self.host + "/api/capjust"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("justice succ ret: {} request_id: {} pred: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.pred_rsp.value, rsp.err_msg))
else:
LOG("justice failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
#
# Recharge interface
Cardkey: signature string of the recharge card
# return value:
# rsp.ret_code: Returns 0 normally
# rsp.err_msg: Returns exception details when an exception occurs
#
def Charge(self, cardid, cardkey) :
tm = str( int(time.time()))
sign = CalcSign( self.pd_id, self.pd_key, tm)
csign = CalcCardSign(cardid, cardkey, tm, self.pd_key)
param = {
"user_id": self.pd_id,
"timestamp":tm,
"sign":sign,
'cardid':cardid,
'csign':csign
}
url = self.host + "/api/charge"
rsp = HttpRequest(url, param)
if rsp.ret_code == 0:
LOG("charge succ ret: {} request_id: {} pred: {} err: {}".format( rsp.ret_code, rsp.request_id, rsp.pred_rsp.value, rsp.err_msg))
else:
LOG("charge failed ret: {} err: {}".format( rsp.ret_code, rsp.err_msg.encode('utf-8')))
return rsp
# #
# rechargeable, return only success or not
Cardkey: signature string of the recharge card
# return value: 0 is returned on success
# #
def ExtendCharge(self, cardid, cardkey) :
return self.Charge(cardid,cardkey).ret_code
# #
# call a refund, only return success or failure
# parameter: request_id: the order number for which the refund is required
# return value: 0 is returned on successful refund
#
# note:
# Predict identifies the interface, and only when ret_code == 0 will the deduction be made and the refund will be requested. Otherwise, no refund will be required
# note 2:
# Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
# #
def JusticeExtend(self, request_id) :
return self.Justice(request_id).ret_code
# #
Select * from balance
# Parameter: none
# Return value: Rsp.CUST_VAL: balance
# #
def QueryBalcExtend(self) :
rsp = self.QueryBalc()
return rsp.cust_val
# #
# From the file recognition code, only the recognition result is returned
# parameter: pred_type; Identification type file_name: indicates the file name
Rsp. pred_rsp.value: the result of recognition
# #
def PredictFromFileExtend( self, pred_type, file_name, head_info = "") :
rsp = self.PredictFromFile(pred_type,file_name,head_info)
return rsp.pred_rsp.value
# #
# identify the interface, only return the recognition result
# parameter: pred_type: recognition type img_data: image data
Rsp. pred_rsp.value: the result of recognition
# #
def PredictExtend(self,pred_type, img_data, head_info = "") :
rsp = self.Predict(pred_type,img_data,head_info)
return rsp.pred_rsp.value
def TestFunc() :
pd_id = "128292" Pd information can be queried on the user center page
pd_key = "bASHdc/12ISJOX7pV3qhPr2ntQ6QcEkV"
app_id = "100001" The account for developer sharing can be found in the Developer Center
app_key = "123456"
# identify the type,
For specific types, you can check the price page of the official website to select specific types. If you don't know the type, you can consult customer service
pred_type = "30400"
api = FateadmApi(app_id, app_key, pd_id, pd_key)
# check balance
balance = api.QueryBalcExtend() # Direct refunds
# api.QueryBalc()
# Identify by file form:
file_name = 'img.gif'
# more site types, need to increase the src_url parameters, specific please refer to the API documentation: http://docs.fateadm.com/web/#/1?page_id=6
# result = API. PredictFromFileExtend (pred_type, file_name) # returned directly identification results
rsp = api.PredictFromFile(pred_type, file_name) # return the detailed identification result
'' # Predict interface is called if not identified by file: RSP = api.predictextend (pred_type,data) # PredictExtend(pred_type,data) # PredictExtend(pred_type,data)
just_flag = False
if just_flag :
if rsp.ret_code == 0:
If the result is not expected, you can call this interface to refund the expected non-conforming order
# Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
api.Justice( rsp.request_id)
#card_id = "123"
#card_key = "123"
# top-up
#api.Charge(card_id, card_key)
LOG("print in testfunc")
if __name__ == "__main__":
TestFunc()
Copy the code
3. Climb the personal details page data of current renren users
Scrapy scrapy scrapy scrapy scrapyimport requests
from lxml import etree
from fateadm_api import FateadmApi
def TestFunc(imgPath,codyType) :
pd_id = "xxxxxxx" Pd information can be queried on the user center page
pd_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
app_id = "xxxxxxxx" The account for developer sharing can be found in the Developer Center
app_key = "xxxxxxxxx"
# identify the type,
For specific types, you can check the price page of the official website to select specific types. If you don't know the type, you can consult customer service
pred_type = codyType
api = FateadmApi(app_id, app_key, pd_id, pd_key)
# check balance
balance = api.QueryBalcExtend() # Direct refunds
# api.QueryBalc()
# Identify by file form:
file_name = imgPath
# more site types, need to increase the src_url parameters, specific please refer to the API documentation: http://docs.fateadm.com/web/#/1?page_id=6
result = api.PredictFromFileExtend(pred_type,file_name) # Return the recognition result directly
# RSP = api.predictFromFile (pred_type, file_name
'' # Predict interface is called if not identified by file: RSP = api.predictextend (pred_type,data) # PredictExtend(pred_type,data) # PredictExtend(pred_type,data)
# just_flag = False
# if just_flag :
# if rsp.ret_code == 0:
If the identified result does not match the expected result, this interface can be called to refund the expected inconsistent order
# # Refund only after the normal recognition of the results, can not pass the verification of the site, do not illegal or abuse, otherwise may be blocked processing
# api.Justice( rsp.request_id)
#card_id = "123"
#card_key = "123"
# top-up
#api.Charge(card_id, card_key)
#LOG("print in testfunc")
return result
# if __name__ == "__main__":
# TestFunc()
session = requests.Session()
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'http://www.renren.com/'
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
code_img_src = tree.xpath('//*[@id="verifyPic_login"]/@src') [0]
code_img_data = requests.get(code_img_src,headers = headers).content
with open('./code.jpg'.'wb') as fp:
fp.write(code_img_data)
result = TestFunc('code.jpg'.30600)
print('Identification result is :' + result)
login_url = 'http://www.renren.com/ajaxLogin/login?1=1&uniqueTimestamp=2021121720536'
data = {
'email':'15893301681'.'icode': result,
'origURL': 'http://www.renren.com/home'.'domain': 'renren.com'.'key_id': '1'.'captcha_type':' web_login'.'password': '47e27dd5ef32b31041ebf56ec85a9b1e4233875e36396241c88245b188c56cdb'.'rkey': 'c655ef0c57a72755f1240d6c0efac67d'.'f': ' ',
}
response = session.post(login_url,headers = headers, data = data)
print(response.status_code)
with open('renren.html'.'w',encoding= 'utf-8') as fp:
fp.write(response.text)
# headers = {
# 'Cookies'
#}
detail_url = 'http://www.renren.com/975996803/profile'
detail_page_text = session.get(detail_url,headers = headers).text
with open('bobo.html'.'w',encoding= 'utf-8') as fp:
fp.write(detail_page_text)
Copy the code
Chapter 6: High-performance Asynchronous crawlers (Thread Pools, Coroutines)
1. Aiohttp implements multi-task asynchronous crawler
import requests
import asyncio
import time
import aiohttp
start = time.time()
urls = [
'http://127.0.0.1:5000/bobo'.'http://127.0.0.1:5000/jay'.'http://127.0.0.1:5000/tom'
]
async def get_page(url) :
#print(' downloading ',url)
#response = requests.get(url)
#print(' Download done ',response.text)
async with aiohttp.ClientSession() as session:
async with await session.get(url) as response:
page_text = await response.text()
print(page_text)
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Total time',end - start)
Copy the code
2. The flask
from flask import Flask
import time
app = Flask(__name__)
@app.route('/bobo')
def index_bobo() :
time.sleep(2)
return 'Hello bobo'
@app.route('/jay')
def index_jay() :
time.sleep(2)
return 'Hello jay'
@app.route('/tom')
def index_tom() :
time.sleep(2)
return 'Hello tom'
if __name__ == '__main__':
app.run(threaded = True)
Copy the code
3. Multitasking coroutines
import asyncio
import time
async def request(url) :
print('Downloading',url)
#time.sleep(2)
await asyncio.sleep(2)
print('Download completed',url)
start = time.time()
urls = ['www.baidu.com'.'www.sogou.com'.'www,goubanjia.com'
]
stasks = []
for url in urls:
c = request(url)
task = asyncio.ensure_future(c)
stasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(stasks))
print(time.time()-start)
Copy the code
4. Multi-task asynchronous crawler
import requests
import asyncio
import time
#import aiohttp
start = time.time()
urls = [
'http://127.0.0.1:5000/bobo'.'http://127.0.0.1:5000/jay'.'http://127.0.0.1:5000/tom'
]
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
async def get_page(url) :
print('Downloading',url)
response = requests.get(url,headers =headers)
print('Download complete',response.text)
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Total time',end - start)
Copy the code
Example 5.
import requests
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://www.pearvideo.com/videoStatus.jsp?contId=1719770&mrd=0.559512982919081'
response = requests.get(url,headers = headers)
print(response.text)
"https://video.pearvideo.com/mp4/short/20210209/1613307944808-15603370-hd.mp4
Copy the code
6. Synchronous crawler
import requests
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
urls = [
'https://www.cnblogs.com/shaozheng/p/12795953.html'.'https://www.cnblogs.com/hanfe1/p/12661505.html'.'https://www.cnblogs.com/tiger666/articles/11070427.html']
def get_content(url) :
print('Climbing :',url)
response = requests.get(url,headers = headers)
if response.status_code == 200:
return response.content
def parse_content(content) :
print(The length of the response data is:.len(content))
for url in urls:
content = get_content(url)
parse_content(content)
Copy the code
7. Basic use of thread pools
# import time
# def get_page(str):
# print(' downloading: ', STR)
# time.sleep(2)
# print(' Download successfully: ', STR)
# name_list = ['xiaozi','aa','bb','cc']
# start_time = time.time()
# for i in range(len(name_list)):
# get_page(name_list[i])
# end_time = time.time()
# print('%d second'%(end_time-start_time))
import time
from multiprocessing.dummy import Pool
start_time = time.time()
def get_page(str) :
print('Downloading:'.str)
time.sleep(2)
print('Download successful:'.str)
name_list = ['xiaozi'.'aa'.'bb'.'cc']
pool = Pool(4)
pool.map(get_page,name_list)
end_time = time.time()
print(end_time-start_time)
Copy the code
8. Application of thread pool in crawler case
import requests
from lxml import etree
import re
from multiprocessing.dummy import Pool
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
}
url = 'https://www.pearvideo.com/'
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@class="vervideo-tlist-bd recommend-btbg clearfix"]/ul/li')
#li_list = tree.xpath('//ul[@class="vervideo-tlist-small"]/li')
urls = []
for li in li_list:
detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href') [0]
#name = li.xpath('./div/a/div[2]/text()')[0] + '.mp4'
name = li.xpath('./div/a/div[2]/div[2]/text()') [0] + '.mp4'
#print(detail_url,name)
detail_page_text = requests.get(detail_url,headers = headers).text
# ex = 'srcUrl="(.*?) ",vdoUrl'
# video_url = re.findall(ex,detail_page_text)[0]
#video_url = tree.xpath('//img[@class="img"]/@src')[0]
#https://video.pearvideo.com/mp4/short/20210209/{}-15603370-hd.mp4
# XHRM code
print(detail_page_text)
''' dic = { 'name':name, 'url':video_url } urls.append(dic) def get_video_data(dic): Url = dic['url'] print(dic['name'],' downloading...... ') data = requests.get(url,headers = headers).context with open(dic['name','w']) as fp: Fp. write(data) print(dic['name'],' download successfully! ') pool = Pool(4) pool.map(get_video_data,urls) pool.close() pool.join() '''
Copy the code
9. Coroutines
import asyncio
async def request(url) :
print('The URL being requested is',url)
print('Request successful,',url)
return url
c = request('www.baidu.com')
# loop = asyncio.get_event_loop()
# loop.run_until_complete(c)
# loop = asyncio.get_event_loop()
# task = loop.create_task(c)
# print(task)
# loop.run_until_complete(task)
# print(task)
# loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(c)
# print(task)
# loop.run_until_complete(task)
# print(task)
def callback_func(task) :
print(task.result())
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
task.add_done_callback(callback_func)
loop.run_until_complete(task)
Copy the code
Chapter 7: Dynamic Loading data Processing (Selenium Module Application, Simulated login 12306)
selenium
from selenium import webdriver
from lxml import etree
from time import sleep
bro = webdriver.Chrome(executable_path='chromedriver.exe')
bro.get('http://scxk.nmpa.gov.cn:81/xk/')
page_text = bro.page_source
tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="gzlist"]/li')
for li in li_list:
name = li.xpath('./dl/@title') [0]
print(name)
sleep(5)
bro.quit()
Copy the code
2. Selenium other automatic operations
from selenium import webdriver
from lxml import etree
from time import sleep
bro = webdriver.Chrome()
bro.get('https://www.taobao.com/')
sleep(2)
search_input = bro.find_element_by_xpath('//*[@id="q"]')
search_input.send_keys('Iphone')
sleep(2)
# bro.execute_async_script('window.scrollTo(0,document.body.scrollHeight)')
# sleep(5)
btn = bro.find_element_by_xpath('//*[@id="J_TSearchForm"]/div[1]/button')
print(type(btn))
btn.click()
bro.get('https://www.baidu.com')
sleep(2)
bro.back()
sleep(2)
bro.forward()
sleep(5)
bro.quit()
Copy the code
3.12306 Example login code
# a sophomore
# February 18, 2021
The winter vacation will start on March 7th
from selenium import webdriver
import time
from PIL import Image
from selenium.webdriver.chrome.options import Options
from selenium.webdriver import ChromeOptions
from selenium.webdriver import ActionChains
# chrome_options = Options()
# chrome_options.add_argument('--headless')
# chrome_options.add_argument('--disable-gpu')
bro = webdriver.Chrome()
bro.maximize_window()
time.sleep(5)
# option = ChromeOptions()
# option.add_experimental_option('excludeSwitches', ['enable-automation'])
# bro = webdriver.Chrome(chrome_options=chrome_options)
# chrome_options. Add_argument (" window - size = 1920105-0 ")
# bro = webdriver.Chrome(chrome_options=chrome_options,options= option)
bro.get('https://kyfw.12306.cn/otn/resources/login.html')
time.sleep(3)
bro.find_element_by_xpath('/html/body/div[2]/div[2]/ul/li[2]/a').click()
bro.save_screenshot('aa.png')
time.sleep(2)
code_img_ele = bro.find_element_by_xpath('//*[@id="J-loginImg"]')
time.sleep(2)
location = code_img_ele.location
print('location:',location)
size = code_img_ele.size
print('size',size)
rangle = (
int(location['x']),int(location['y']),int(location['x'] + int(size['width')),int(location['y'] +int(size['height')))print(rangle)
i = Image.open('./aa.png')
code_img_name = './code.png'
frame = i.crop(rangle)
frame.save(code_img_name)
#bro.quit()
# a sophomore
# February 19, 2021
The winter vacation will start on March 7th
# Captcha coordinates cannot be accurately identified, the coordinates are misaligned, using a headless browser can be identified
''' result = print(chaojiying.PostPic(im, 9004)['pic_str']) all_list = [] if '|' in result: list_1 = result.split('! ') count_1 = len(list_1) for i in range(count_1): xy_list = [] x = int(list_1[i].split(',')[0]) y = int(list_1[i].split(',')[1]) xy_list.append(x) xy_list.append(y) all_list.append(xy_list) else: xy_list = [] x = int(list_1[i].split(',')[0]) y = int(list_1[i].split(',')[1]) xy_list.append(x) xy_list.append(y) all_list.append(xy_list) print(all_list) for l in all_list: X = l[0] y = l[1] ActionChains(bro).move_to_element_with_offset(code_img_ele,x,y).click().perform() time.sleep(0.5) bro.find_element_by_id('J-userName').send_keys('') time.sleep(2) bro.find_element_by_id('J-password').send_keys('') time.sleep(2) bro.find_element_by_id('J-login').click() bro.quit() '''
Copy the code
4. Processing of action chain and IFrame
from selenium import webdriver
from time import sleep
from selenium.webdriver import ActionChains
bro = webdriver.Chrome()
bro.get('https://www.runoob.com/try/try.php?filename=juquryui-api-droppable')
bro.switch_to.frame('id')
div = bro.find_elements_by_id(' ')
action = ActionChains(bro)
action.click_and_hold(div)
for i in range(5):
action.move_by_offset(17.0)
sleep(0.3)
action.release()
print(div)
Copy the code
Google headless browser + reverse detection
from selenium import webdriver
from time import sleep
from selenium.webdriver.chrome.options import Options
from selenium.webdriver import ChromeOptions
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
option = ChromeOptions()
option.add_experimental_option('excludeSwitches'['enable-automation'])
bro = webdriver.Chrome(chrome_options=chrome_options,options=option)
bro.get('https://www.baidu.com')
print(bro.page_source)
sleep(2)
bro.quit()
Copy the code
6. Implement 1236 simulated login based on Selenium
2.18 # 2021
import requests
from hashlib import md5
class Chaojiying_Client(object) :
def __init__(self, username, password, soft_id) :
self.username = username
password = password.encode('utf8')
self.password = md5(password).hexdigest()
self.soft_id = soft_id
self.base_params = {
'user': self.username,
'pass2': self.password,
'softid': self.soft_id,
}
self.headers = {
'Connection': 'Keep-Alive'.'User-Agent': 'the Mozilla / 4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident / 4.0) ',}def PostPic(self, im, codetype) :
"" "im: image bytes codetype: topic types reference http://www.chaojiying.com/price.html "" "
params = {
'codetype': codetype,
}
params.update(self.base_params)
files = {'userfile': ('ccc.jpg', im)}
r = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=params, files=files, headers=self.headers)
return r.json()
def ReportError(self, im_id) :
"" im_id: ID of the image in which the error was reported ""
params = {
'id': im_id,
}
params.update(self.base_params)
r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php', data=params, headers=self.headers)
return r.json()
# if __name__ == '__main__':
# chaojiying = Chaojiying_Client(' userid ', 'userid ', '96001')
# im = open('a.jpg', 'rb').read()
# print chaojiying.PostPic(im, 1902)
# chaojiying = Chaojiying_Client('xxxxxxxxxx', 'xxxxxxxxxx', 'xxxxxxx')
# im = open(' Chapter 7: Dynamic loading data processing /12306.jpg', 'rb').read()
# print(chaojiying.PostPic(im, 9004)['pic_str'])
from selenium import webdriver
import time
bro = webdriver.Chrome()
bro.get('https://kyfw.12306.cn/otn/resources/login.html')
time.sleep(3)
bro.find_element_by_xpath('/html/body/div[2]/div[2]/ul/li[2]/a').click()
Copy the code
7. Simulate logging in to Qzone
from selenium import webdriver
from selenium.webdriver import ActionChains
from time import sleep
bro = webdriver.Chrome()
bro.get('https://qzone.qq.com/')
bro.switch_to.frame('login_frame')
bro.find_element_by_id('switcher_plogin').click()
#account = input(' Please input account number :')
bro.find_element_by_id('u').send_keys(' ')
#password = input(' Please enter password :')
bro.find_element_by_id('p').send_keys(' ')
bro.find_element_by_id('login_button').click()
Copy the code
Chapter 8: Scrapy Framing
1. Various projects combat, scrapy configuration modification 2. BossPro example
# a sophomore
# Tuesday February 23, 2021
The winter vacation will start on March 7th
import requests
from lxml import etree
#url = 'https://www.zhipin.com/c101010100/?query=python&ka=sel-city-101010100'
url = 'https://www.zhipin.com/c101120100/b_%E9%95%BF%E6%B8%85%E5%8C%BA/?ka=sel-business-5'
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36'
}
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
print(tree)
li_list = tree.xpath('//*[@id="main"]/div/div[2]/ul/li')
print(li_list)
for li in li_list:
job_name = li.xpath('.//span[@class="job-name"]a/text()')
print(job_name)
Copy the code
3. QiubaiPro example
# -*- coding: utf-8 -*-
# a sophomore
# Sunday, February 21, 2021
The winter vacation will start on March 7th
import requests
from lxml import etree
headers = {
'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36'
}
url = 'https://www.qiushibaike.com/text/'
page_text = requests.get(url,headers = headers).text
tree = etree.HTML(page_text)
div_list = tree.xpath('//div[@id="content"]/div[1]/div[2]/div')
print(div_list)
# print(tree.xpath('//*[@id="qiushi_tag_124072337"]/a[1]/div/span//text()'))
for div in div_list:
auther = div.xpath('./div[1]/a[2]/h2/text()') [0]
# print(auther)
content = div.xpath('./a[1]/div/span//text()')
content = ' '.join(content)
# content = div.xpath('//*[@id="qiushi_tag_124072337"]/a[1]/div/span')
# print(content)
print(auther,content)
# print(tree.xpath('//*[@id="qiushi_tag_124072337"]/div[1]/a[2]/h2/text()'))
Copy the code
4. Database example
# a sophomore
# Sunday, February 21, 2021
The winter vacation will start on March 7th
import pymysql
# link database
# parameter 1: IP address of the host where the mysql server resides
# parameter 2: user name
# parameter 3: password
Parameter 4: The name of the database to link to
# db = pymysql.connect("localhost", "root", "200829", "wj" )
db = pymysql.connect("192.168.31.19"."root"."200829"."wj" )
Create a cursor object
cursor = db.cursor()
sql = "select version()"
Execute SQL statement
cursor.execute(sql)
Get the returned information
data = cursor.fetchone()
print(data)
# disconnect
cursor.close()
db.close()
Copy the code