In the case of no crawler framework, I tried to realize a distributed crawler system after learning from many sources, and can save data to different places, such as MySQL and HBase.



Because this system is based on the idea of interface oriented coding to develop, so it has a certain expansibility, interested friends directly look at the code, you can understand its design ideas.

Although much of the code is currently tightly coupled, much of it is extractable and configurable with some time and effort.

Due to the relationship of time, I only wrote the crawler of jingdong and Suning, but it can completely realize the random scheduling of crawlers of different websites. Based on its code structure, it is not difficult to write the crawler of Gome, Tmall and other commodities, but it is estimated to take some time and energy.

Because when PARSING the data of the web page, for example, when I was climbing the price of suning Shopping, the price was obtained asynchronously, and its API was a long string of number combinations, it took me several hours to discover the pattern, of course, I also admitted that I was not experienced enough.

The design of this system, in addition to the basic data crawling, pays more attention to the following aspects:

How to implement distributed?



How to implement URL random cycle scheduling?



How do I regularly add seed urls to the URL repository?



How to monitor crawler node program and send email alarm?



How to implement a random IP proxy library?


The following will be for this system to do an overall basic introduction, I have very detailed notes in the code, interested friends can refer to the code, and finally I will give some data analysis when I crawler.

Note also that the crawler system is based on Java implementation, but the language itself is still not the most important, interested friends can try Python implementation.

Distributed crawler architecture

The overall system architecture is as follows:



As can be seen from the above architecture, the whole system is mainly divided into three parts:

The crawler system



URL scheduling system



Monitoring alarm system


Crawler system is used to crawl data, because the system design is distributed, so crawler itself can run on different server nodes.

The core of the URL scheduling system lies in the URL warehouse. The so-called URL warehouse is actually saved with Redis the list of URLS to be climbed, and the URL is consumed according to certain strategies in our URL scheduler. From this perspective, the URL repository is also a URL queue.

The monitoring and alarm system mainly monitors crawler nodes. Although the failure of one of the parallel crawler nodes has no impact on the overall data crawling itself (only reduces crawler speed), we still hope to receive the notification of node failure actively rather than passively discover it.

The following will be aimed at the above three aspects and combined with some code snippets to do some basic introduction to the design ideas of the whole system.

The crawler system



Crawler system is a process that runs independently. We package our crawler system into JAR packages and distribute them to different nodes for execution. In this way, the efficiency of crawler can be improved by crawling data in parallel. (Note: ZooKeeper monitoring belongs to monitoring alarm system, URL scheduler belongs to URL scheduling system)

Random IP proxy

Random IP proxies are added mainly for anti-crawler purposes, so it would be helpful to have an IP proxy library and be able to randomly use different proxies when building HTTP clients.

To use the IP proxy library in the system, add the available proxy address information to the text file first:


Copy the code

# IPProxyRepository.txt

58.60.255.104:8118

219.135.164.245:3128

27.44.171.27:9999

219.135.164.245:3128

58.60.255.104:8118

58.252.6.165:9000

.

It should be noted that the above proxy IP addresses are some of the proxy IP ADDRESSES I got on Xicun agent, which may not be available. It is suggested to buy a batch of proxy IP addresses by ourselves, which can save a lot of time and energy to find proxy IP addresses.

Then in the tool class that builds the HTTP client, these proxy IP addresses are loaded into memory, into a Java HashMap, when the tool class is first used:


Copy the code

// Address proxy library Map

private static Map<String, Integer> IPProxyRepository = new HashMap<>();

private static String[] keysArray = null; // keysArray is used to generate random proxy objects





/ * *

* Static code blocks are used to load the IP proxy library into the set when first used

* /

static {

InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // Load the text containing the proxy IP

// Build the buffer flow object

InputStreamReader isr = new InputStreamReader(in);

BufferedReader bfr = new BufferedReader(isr);

String line = null;

try {

// Loop through each line and add it to map

while ((line = bfr.readLine()) ! = null) {

String[] split = line.split(":"); // Use: as the delimiter, that is, the data format in the text should be 192.168.1.1:4893

String host = split[0];

int port = Integer.valueOf(split[1]);

IPProxyRepository.put(host, port);

}

Set<String> keys = IPProxyRepository.keySet();

keysArray = keys.toArray(new String[keys.size()]); // keysArray is used to generate random proxy objects

} catch (IOException e) {

e.printStackTrace();

}





}

After that, every time we build the HTTP client, we will check the map first to see if there is a proxy IP. If there is, we will use the proxy. If there is no proxy, we will not use the proxy:


Copy the code

CloseableHttpClient httpClient = null;

HttpHost proxy = null;

If (IPProxyRepository. Size () > 0) {// If the IP proxy repository is not empty, set the proxy

proxy = getRandomProxy();

httpClient = HttpClients.custom().setProxy(proxy).build(); // Create an HttpClient object

} else {

httpClient = HttpClients.custom().build(); // Create an HttpClient object

}

HttpGet request = new HttpGet(url); // Build an HTTTP GET request

.

Random proxy objects are generated using the following method:


Copy the code

/ * *

* Returns a random proxy object

*

* @return

* /

public static HttpHost getRandomProxy() {

// Randomly get host:port and build the proxy object

Random random = new Random();

String host = keysArray[random.nextInt(keysArray.length)];

int port = IPProxyRepository.get(host);

HttpHost proxy = new HttpHost(host, port); // Set the HTTP proxy

return proxy;

}

In this way, through the above design, the basic implementation of random IP proxy function, of course, there are a lot of places can be improved.

For example, when a request fails using this IP proxy, can it be logged? It is perfectly possible to remove it from the broker library after a certain number of times and generate logs for developers or operations to refer to, but I won’t do that.

Web page loader

Web download is used to download the data in the web page, mainly based on the following interface development:


Copy the code

/ * *

* Web page data download

* /

public interface IDownload {

/ * *

Download web page data for the given URL

* @param url

* @return

* /

public Page download(String url);

}

Based on this, only an HTTP GET download is implemented in the system, but it can also complete the functions we need:


Copy the code

/ * *

* Data download implementation class

* /

public class HttpGetDownloadImpl implements IDownload {





@Override

public Page download(String url) {

Page page = new Page();

String content = HttpUtil.getHttpContent(url); // Get page data

page.setUrl(url);

page.setContent(content);

return page;

}

}

Web page parser

The web page parser is to parse out the data we are interested in from the downloaded web page and save it to an object for further processing by the data storage to save it to different persistent warehouses. It is developed based on the following interface:


Copy the code

/ * *

* Web page data parsing

* /

public interface IParser {

public void parser(Page page);

}

Web page parser in the development of the whole system is also a more important component, the function is not complex, mainly more code, for different mall different goods, the corresponding parser may not be the same.

Therefore, it is necessary to develop goods for special malls, because it is obvious that jingdong’s web page template is definitely different from suning’s, and Tmall’s with JINGdong’s is definitely different.

So this is entirely up to your needs to develop, just say, in the process of parser development will find some duplicate code, this code can be abstracted from the development of a utility class.

At present, the mobile phone commodity data of JINGdong and Suning are retrieved in the system, so these two implementation classes are written:


Copy the code

/ * *

* Analyze the implementation classes of JINGdong products

* /

public class JDHtmlParserImpl implements IParser {

.

}





/ * *

* Suning Shopping web page analysis

* /

public class SNHtmlParserImpl implements IParser {

.

}

Data storage




The data memory is mainly to save the data objects resolved by the web Page parser into different tables, and for the mobile phone goods climbed this time, the data object is a Page object below:


Copy the code

/ * *

* Web objects, mainly containing web content and product data

* /

public class Page {

private String content; // Page content





private String id; Id / / commodities

private String source; // Source of goods

private String brand; // Product brand

private String title; // Product title

private float price; // Commodity prices

private int commentCount; // Number of product reviews

private String url; // Product address

private String imgUrl; // Product image address

private String params; // Product specifications





private List<String> urls = new ArrayList<>(); // The container used to hold the parsed item url when parsing the list page

}

In MySQL, the table data structure is as follows:


Copy the code

-- ----------------------------

-- Table structure for phone

-- ----------------------------

DROP TABLE IF EXISTS `phone`;

CREATE TABLE `phone` (

'id' varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT' iD ',

'source' varchar(30) NOT NULL COMMENT '

'brand' varchar(30) DEFAULT NULL COMMENT '评 论 ',

'title' varchar(255) DEFAULT NULL COMMENT '主 体 系 列 ',

'price' float(10,2) DEFAULT NULL COMMENT '新 款 ',

'comment_count' varchar(30) DEFAULT NULL COMMENT 'comment_count ',

'url' varchar(500) DEFAULT NULL COMMENT '主 体 系 列 ',

'img_url' varchar(500) DEFAULT NULL COMMENT '主 键 ',

'params' text COMMENT' cell phone parameters ',

PRIMARY KEY (`id`,`source`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

The table structure in HBase is as follows:


Copy the code

## cf1 下 iD source price comment brand URL

## cf2 stores title params imgUrl

create 'phone', 'cf1', 'cf2'





Check the created table in HBase Shell

hbase(main):135:0> desc 'phone'

Table phone is ENABLED

phone

COLUMN FAMILIES DESCRIPTION

{NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK

_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>

'65536', REPLICATION_SCOPE => '0'}

{NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK

_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>

'65536', REPLICATION_SCOPE => '0'}

2 Row (s) in 0.0350 seconds

That is, two column families (cf1 and cf2) are created in HBase. Cf1 is used to store the ID source Price comment brand URL field. Cf2 is used to save the title Params imgUrl field information.

Different data stores use different implementation classes, but they are all developed based on the same interface:


Copy the code

/ * *

* Storage of commodity data

* /

public interface IStore {

public void store(Page page);

}

Then based on this, MySQL storage implementation class, HBase storage implementation class and console output implementation class are developed, such as MySQL storage implementation class, which is actually a simple data insert statement:


Copy the code

/ * *

* Write data to mysql tables using the DBC database connection pool

* /

public class MySQLStoreImpl implements IStore {

private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource());





@Override

public void store(Page page) {

String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(? ,? ,? ,? ,? ,? ,? ,? ,?) ";

try {

queryRunner.update(sql, page.getId(),

page.getSource(),

page.getBrand(),

page.getTitle(),

page.getPrice(),

page.getCommentCount(),

page.getUrl(),

page.getImgUrl(),

page.getParams());

} catch (SQLException e) {

e.printStackTrace();

}

}

}

The HBase storage implementation class is the common insert statement code of the HBase Java API.


Copy the code

.

// cf1:price

Put pricePut = new Put(rowKey);

// You must check whether the pointer is null or not

pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() ! = null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());

puts.add(pricePut);

// cf1:comment

Put commentPut = new Put(rowKey);

commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() ! = null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());

puts.add(commentPut);

// cf1:brand

Put brandPut = new Put(rowKey);

brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() ! = null ? page.getBrand().getBytes() : "".getBytes());

puts.add(brandPut);

.

Of course, when initializing the crawler, you can manually choose where to store the data:


Copy the code

// 3. Insert memory

iSpider.setStore(new HBaseStoreImpl());

There is no code that can be stored in more than one place at the same time, and the way the code is structured, it’s easy to do that, just modify the code.

In fact, you can save the data to MySQL and then import it to HBase through Sqoop. For details, see my Sqoop article.

Still, if you do want to save data to HBase, make sure you have a cluster environment available and add the following configuration files to your CLASspath:


Copy the code

core-site.xml

hbase-site.xml

hdfs-site.xml

If you are interested in big data, you can play around with this. If you are not familiar with this, you can use MySQL storage directly. You only need to inject MySQL storage when initializing the crawler:


Copy the code

// 3. Insert memory

iSpider.setStore(new MySQLStoreImpl());

URL scheduling system



URL scheduling system is the bridge and key to realize the distributed of the whole crawler system. It is through the use of URL scheduling system that the whole crawler system can obtain URL randomly with high efficiency (Redis as storage) and realize the distributed of the whole system.

URL warehouse

As you can see from the architecture diagram, the URL repository is just a Redis repository, that is, Redis is used in our system to store the LIST of URL addresses.

Only in this way can we ensure the distributed implementation of our program. As long as the URL saved is unique, no matter how many crawlers we have, the data saved is only one copy and will not be repeated.

At the same time, the URL policy in the URL warehouse is realized through the queue, which will be known through the IMPLEMENTATION of the URL scheduler.

In addition, in our URL repository, we mainly store the following data:

Seed URL list, Redis data type is list

The seed URL is stored persistently. After a certain period of time, the URL timer obtains the URL through the seed URL and infuses it into the high-priority URL queue that our crawler needs to use.

In this way, we can ensure that our crawler can continue to crawl data without interrupting the execution of the program.

High priority URL queue, Redis data type is set

What is a high-priority URL queue? It’s actually used to hold list urls. So what is a list URL?

To put it bluntly, a list contains multiple commodities. Take JINGdong as an example, we open a list of mobile phones:



Instead of a URL for a specific item, the address contains lists of multiple items of data that we need to crawl.

Through the analysis of each advanced URL, we can obtain a lot of specific product URL, and the specific product URL is the low-priority URL, which will be saved in the low-priority URL queue.

So take this system as an example, the data saved is similar to the following:


Copy the code

jd.com.higher

, https://list.jd.com/list.html?cat=9987, & 653655 page = 1

.

suning.com.higher

--https://list.suning.com/0-20006-0.html

.

Low priority URL queue, Redis data type is set

A low-priority URL is a specific product URL, such as the following mobile product:



By downloading the URL’s data and parsing it, we get the data we want.

So take this system as an example, the data saved is similar to the following:


Copy the code

jd.com.lower

--https://item.jd.com/23545806622.html

.

suning.com.lower

--https://product.suning.com/0000000000/690128156.html

.

URL scheduler

The so-called URL scheduler is the scheduling strategy of URL repository Java code, but because its core lies in scheduling, it is put into THE URL scheduler to illustrate. At present, its scheduling is developed based on the following interface:


Copy the code

/ * *

* the url warehouse

* Main functions:

* Add urls to the repository (high-priority list, low-priority item urls)

* Get urls from the repository (get higher priority urls first, or lower priority urls if not)

*

* /

public interface IRepository {





/ * *

* Method to get the URL

* Get urls from the repository (get higher priority urls first, or lower priority urls if not)

* @return

* /

public String poll();





/ * *

* Adds the item list URL to the high-priority list

* @param highUrl

* /

public void offerHigher(String highUrl);





/ * *

* Adds the item URL to the low-priority list

* @param lowUrl

* /

public void offerLower(String lowUrl);





}

Its implementation based on Redis as a URL warehouse is as follows:


Copy the code

/ * *

* Whole web crawler based on Redis, get crawler URL randomly:

*

* The data structure used to store urls in Redis is as follows:

* 1. Set of domain names to be crawled (storage data type is SET, this need to be added in Redis first)

* key

* spider.website.domains

* value(set)

* jd.com suning.com gome.com

* Key is obtained from the constant object spiderconstants.spider_website_domains_key

* 2. The high-low priority URL queue corresponding to each domain name (the storage data type is list, which is dynamically added after the crawler parses the seed URL)

* key

* jd.com.higher

* jd.com.lower

* suning.com.higher

* suning.com.lower

* gome.com.higher

* gome.come.lower

* value(list)

* Corresponds to a list of urls to parse

* Key is obtained by random domain name + the constant SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX or SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX

* 3. Seed URL list

* key

* spider.seed.urls

* value(list)

* The seed URL of the data to be crawled

* Key is obtained from the constant spiderconstants.spider_SEED_urls_key

*

* the url in the seed URL list is periodically added to the low-priority URL queue by the URL scheduler

* /

public class RandomRedisRepositoryImpl implements IRepository {





/ * *

* Construction method

* /

public RandomRedisRepositoryImpl() {

init();

}





/ * *

* Initialization method, initialization, will first exist in redis high and low priority URL queue all delete

* Otherwise, the last URL queue did not run out of URLS, and then stop running the next time, will cause the URL repository has duplicate urls

* /

public void init() {

Jedis jedis = JedisUtil.getJedis();

Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);

String higherUrlKey;

String lowerUrlKey;

for(String domain : domains) {

higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;

lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;

jedis.del(higherUrlKey, lowerUrlKey);

}

JedisUtil.returnJedis(jedis);

}





/ * *

* Get the URL from the queue. The current policy is:

* 1. First fetch from the high-priority URL queue

* 2. Fetch it from the lower-priority URL queue

* For our real scenario, the list URL should be parsed first and then the commodity URL

* However, it is important to note that in a distributed multi-threaded environment, this is definitely not guaranteed, because at some point in time the url queue is of high priority

* has run out of urls, but in fact the program is still parsing the next higher-priority URL, at this point, other threads to fetch the higher-priority queue URL will not be able to get

* The url in the low-priority queue is then fetched, which is especially important when you're actually thinking about analysis

* @return

* /

@Override

public String poll() {

// Get a random top-level domain name from set

Jedis jedis = JedisUtil.getJedis();

String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com

String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher

String url = jedis.lpop(key);

If (url == null) {// If null, the value is obtained from the lower priority

key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower

url = jedis.lpop(key);

}

JedisUtil.returnJedis(jedis);

return url;

}





/ * *

* Adds urls to the high-priority URL queue

* @param highUrl

* /

@Override

public void offerHigher(String highUrl) {

offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);

}





/ * *

* Adds a URL to the low-priority URL queue

* @param lowUrl

* /

@Override

public void offerLower(String lowUrl) {

offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);

}





/ * *

* A generic way to add urls, abstracted from offerHigher and offerLower

* @param URL Specifies the URL to be added

* @param urlTypeSuffix Url type suffix. Higher or. Lower

* /

public void offerUrl(String url, String urlTypeSuffix) {

Jedis jedis = JedisUtil.getJedis();

String domain = SpiderUtil.getTopDomain(url); // Get the top-level domain corresponding to the URL, such as jd.com

String key = domain + urlTypeSuffix; // Concatenate the KEY of the URL queue, such as jd.com.higher

jedis.lpush(key, url); // Add the URL to the URL queue

JedisUtil.returnJedis(jedis);

}

}

Through code analysis, we can also know that its core is how to schedule urls in URL repository (Redis).

URL timer

After a period of time, urls in both the high-priority AND low-priority URL queues are consumed.

In order to enable the program to continue to crawl data and reduce human intervention, seed URL can be inserted into Redis in advance, and then URL timer can be periodically removed from the seed URL and stored in the URL queue of high priority, so as to achieve the purpose of regularly and continuously crawl data.

The need to iterate over and over the data after the URL is consumed varies depending on your business needs, so this step is not required, but it is provided.

Because in fact, the data we need to crawl is also updated every once in a while, if we want the data to be updated regularly, then the timer is very important.

It is important to note, however, that once it is decided that the data needs to be iterated repeatedly, then the memory implementation needs to be designed with repeated data in mind, i.e. repeated data should be an update operation.

At present, the memory I designed does not include this function, interested friends can achieve their own, just need to determine whether the data exists in the database before inserting data.

Another point to note is that the URL timer is a separate process and needs to be started separately.

The timer is implemented based on Quartz, and here is the code for its job:


Copy the code

/ * *

* Get seed urls from url repository periodically every day and add them to the high priority list

* /

public class UrlJob implements Job {





// log4j logging

private Logger logger = LoggerFactory.getLogger(UrlJob.class);





@Override

public void execute(JobExecutionContext context) throws JobExecutionException {

/ * *

* 1. Obtain the seed URL from the specified URL seed repository

* 2. Add the seed URL to the high priority list

* /

Jedis jedis = JedisUtil.getJedis();

Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider. Seed. Urls Redis data type is set to prevent repeated seed urls

for(String seedUrl : seedUrls) {

String domain = SpiderUtil.getTopDomain(seedUrl); // The top-level domain of the seed URL

jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);

Logger. info(" Get seed :{}", seedUrl);

}

JedisUtil.returnJedis(jedis);

// System.out.println("Scheduler Job Test..." );

}





}

The scheduler is implemented as follows:


Copy the code

/ * *

* URL timing scheduler, regularly store seed url to url corresponding warehouse

*

* Business rules: Seed urls are stored in the warehouse at 1:10 am every day

* /

public class UrlJobScheduler {





public UrlJobScheduler() {

init();

}





/ * *

Initialize the scheduler

* /

public void init() {

try {

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();





// If the following start method is not executed, the task will not be started

scheduler.start();





String name = "URL_SCHEDULER_JOB";

String group = "URL_SCHEDULER_JOB_GROUP";

JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);

String cronExpression = "0 10 1 * * ?" ;

Trigger trigger = new CronTrigger(name, group, cronExpression);





// Schedule tasks

scheduler.scheduleJob(jobDetail, trigger);





} catch (SchedulerException e) {

e.printStackTrace();

} catch (ParseException e) {

e.printStackTrace();

}

}





public static void main(String[] args) {

UrlJobScheduler urlJobScheduler = new UrlJobScheduler();

urlJobScheduler.start();

}





/ * *

* Scheduled tasks

* Because we need to get seed urls from the designated repository at regular times every day and store them in the list of high-priority urls

* Therefore, it is a continuous process, so it cannot be stopped

* /

private void start() {

while (true) {





}

}

}

Monitoring alarm system



The monitoring and alarm system is mainly added to enable users to detect node downtime actively rather than passively, because the crawler program may run continuously in reality.

In addition, we will deploy our crawler program on multiple nodes, so it is necessary to monitor the nodes, and timely discover and correct the problems on the nodes. It should be noted that the monitoring and alarm system is an independent process, which needs to be started separately.

The basic principle of

Create a /ispider node in ZooKeeper:


Copy the code

[zk: localhost:2181(CONNECTED) 1] create /ispider ispider

Created /ispider

The development of monitoring and alarm system mainly relies on ZooKeeper implementation. Monitoring program monitors the node directory below ZooKeeper:


Copy the code

[zk: localhost:2181(CONNECTED) 0] ls /ispider

[]

A temporary node directory is registered under this node directory when the crawler starts:


Copy the code

[zk: localhost:2181(CONNECTED) 0] ls /ispider

[192.168.43.166]

When a node goes down, the temporary node directory is deleted by ZooKeeper.


Copy the code

[zk: localhost:2181(CONNECTED) 0] ls /ispider

[]

And because we are listening to the node directory /ispider, When ZooKeeper deletes a node directory under it (or adds a node directory), ZooKeeper sends a notification to our monitor.

That is, our monitoring program will get a callback, so that we can perform the alarm system action in the callback program, so as to complete the monitoring alarm function.

ZooKeeper Java API usage guide

You can use the native Java API of ZooKeeper, which I used in another RPC framework I wrote (with the underlying Netty-based remote communication).

Obviously the code is much more complex, and ZooKeeper itself requires more learning and understanding to make it easier to use.

Therefore, in order to reduce the difficulty of development, a third-party encapsulated API, namely Curator, is used to develop the ZooKeeper client program.

Register crawler ZooKeeper

When starting the crawler system, our program will start a ZooKeeper client to register its node information, mainly IP address, to ZooKeeper.

And create a node named after the IP address of the node where the crawler resides in the /ispider node directory, such as /ispider/192.168.43.116. The code is as follows:


Copy the code

/ * *

* registered zk

* /

private void registerZK() {

String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";

int baseSleepTimeMs = 1000;

int maxRetries = 3;

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);

CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);

curator.start();

String ip = null;

try {

// Register the write node to the zk directory to create the node

ip = InetAddress.getLocalHost().getHostAddress();

curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());

} catch (UnknownHostException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

It should be noted that the node we created is a temporary node, which must be a temporary node in order to realize the monitoring and alarm function.

Monitoring program

First, we need to listen on a node directory in ZooKeeper. In our system, the design is to listen on the node directory /ispider:


Copy the code

public SpiderMonitorTask() {

String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";

int baseSleepTimeMs = 1000;

int maxRetries = 3;

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);

curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);

curator.start();

try {

previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");

} catch (Exception e) {

e.printStackTrace();

}

}

It registers the Watcher in ZooKeeper, the notification-receiving callback program, in which we execute our alarm logic:


Copy the code

/ * *

* This method will be called whenever there is a change in the directory corresponding to the monitored ZK

* Get the latest node state, compare the latest node state with the original or last node state, then we know who caused the node change

* @param event

* /

@Override

public void process(WatchedEvent event) {

try {

List<String> currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");

// HashSet<String> previousNodesSet = new HashSet<>(previousNodes);

If (currentNodes.size() > previousnodes.size ()) {// New nodes are added to the current node service

for(String node : currentNodes) {

if(! previousNodes.contains(node)) {

// The current node is the new node

Logger. info("---- has new crawler {} added ", node);

}

}

} else if(currentNodes.size() < previousNodes.size()) {// An alarm email or SMS message is sent when a node is down

for(String node : previousNodes) {

if(! currentNodes.contains(node)) {

// The current node is down and you need to send an email

Logger. info("---- crawler {} is down ", node);

Mailutil. sendMail(" Some crawler node is down, please check the crawler node manually, node information is: ", node);

}

}

} // The number of drops is the same as the number of new drops. The above is not included in this case, interested friends can directly implement monitoring including this special case

previousNodes = currentNodes; // Update the last node list to become the latest node list

} catch (Exception e) {

e.printStackTrace();

}

// In the native API, you need to do the monitoring again, because each monitoring only takes effect once, so when the change is detected, you need to listen again, so that the next time you can listen

// But this is not required when using the Curator API

}

Of course, there are certain problems in the above logic to determine whether the node is hung. According to the above logic, if the event of adding a node and deleting a node occurs at the same time, it cannot be determined. Therefore, if more accurate, the above program code can be modified.

Mail sending module

Using the template code will do, but note that you use your own email address for the sender.

Here is the message received when the crawler node hangs:



In fact, if the SMS service is purchased, SMS can also be sent to our phones through the SMS API.

Combat: climb jingdong, Suning Buy the whole network of mobile phone commodity data

As mentioned in the introduction of this system, I only wrote the web page resolver of JINGdong and Suning, so the next step is to crawl the data of mobile goods in the whole network.

The environment that

Ensure that Redis and ZooKeeper services are available. In addition, if HBase is used to store data, ensure that HBase is available in the Hadoop cluster and related configuration files have been added to the classpath of the crawler.

It is also important to note that the URL timer and the monitoring alarm system are run as separate processes and are also optional.

The crawler results

Two crawls are performed to save data to MySQL and HBase respectively. The following data is displayed.

Save to MySQL


Copy the code

mysql> select count(*) from phone;

+----------+

| count(*) |

+----------+

12052 | |

+----------+

1 row in set





mysql> select count(*) from phone where source='jd.com';

+----------+

| count(*) |

+----------+

9578 | |

+----------+

1 row in set





mysql> select count(*) from phone where source='suning

.com';

+----------+

| count(*) |

+----------+

2474 | |

+----------+

1 row in set

View the data in the visualization tool:



Save the HBase


Copy the code

hbase(main):225:0* count 'phone'

Current count: 1000, row: 11155386088_jd.com

Current count: 2000, row: 136191393_suning.com

Current count: 3000, row: 16893837301_jd.com

Current count: 4000, row: 19036619855_jd.com

Current count: 5000, row: 1983786945_jd.com

Current count: 6000, row: 1997392141_jd.com

Current count: 7000, row: 21798495372_jd.com

Current count: 8000, row: 24154264902_jd.com

Current count: 9000, row: 25687565618_jd.com

Current count: 10000, row: 26458674797_jd.com

Current count: 11000, row: 617169906_suning.com

Current count: 12000, row: 769705049_suning.com

12348 row(s) in 1.5720 seconds

= > 12348

Viewing data in HDFS:



Data volume and actual situation analysis



Jingdong: The list of jingdong mobile phone is about 160 pages, each list has 60 product data, so the total amount is about 9600, our data is basically consistent.

Later log analysis shows that the lost data is generally caused by connection timeout, so it is recommended to select the crawler environment on the host with a good network environment.

At the same time, it would be better if there were IP proxy address library. In addition, the connection timeout situation can be further controlled in our program.

Once there is a URL that fails to crawl data, it can be added to the retry URL queue. At present, I have not done this function, interested students can try.

Suning: If you look at the data of Suning, there are about 100 pages of mobile phone list, and each page is also 60 product data, so the total amount is about 6,000.

But as you can see, our data is on the order of 3000 (what is still missing is the problem of connection failure due to frequent crawls). Why?

This is because after opening a list page of Suning, 30 commodities will be loaded first. When the mouse is slid down, the other 30 commodity data will be loaded through another API. This is true for each list page.

After knowing this reason, it is not difficult to achieve, but because of the time, I did not do, interested friends toss it.

Analyze the performance of crawler system by log

In our crawler system, every key place, such as web page download, data analysis and so on, is logger, so the relevant time parameters can be roughly analyzed through the log.


Copy the code

The 2018-04-01 21:26:03 [] - thread pool - 1-1 [cn. Xpleaf. Spiders. Utils. HttpUtil] [INFO] - download page: https://list.jd.com/list.html?cat=9987-653655 & page = 1, and the consumption time: 590 ms, agent information: null: null

2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - Analytical list page: https://list.jd.com/list.html?cat=9987, 653655 & page = 1, the consumption time: 46 ms

2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - Analytical list page: https://list.suning.com/0-20006-0.html, and the consumption time: 49 ms

The 2018-04-01 21:26:04 [- thread pool - 1-5] [cn. Xpleaf. Spiders. Utils. HttpUtil] [INFO] - (download page: https://item.jd.com/6737464.html, and the consumption time: 219 ms, agency information: null: null

The 2018-04-01 21:26:04 [- thread pool - 1-2] [cn. Xpleaf. Spiders. Utils. HttpUtil] [INFO] - (download page: https://list.jd.com/list.html?cat=9987, 653655 & page = 2 & sort = sort_rank_asc & trans = 1 & JL = 6 _0_0, consumption length: 276 ms, agent information: null: null

The 2018-04-01 21:26:04 [- thread pool - 1-4] [cn. Xpleaf. Spiders. Utils. HttpUtil] [INFO] - (download page: https://list.suning.com/0-20006-99.html, and the consumption time: 300 ms, agency information: null: null

2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - Analytical list page: https://list.suning.com/0-20006-99.html, and the consumption time: 4 ms

.

The 2018-04-01 21:27:49 [- thread pool - 1-3] [cn. Xpleaf. Spiders. Utils. HttpUtil] [INFO] - (download page: https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891, and the consumption time: 176 ms, agency information: null: null

2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - Parsing the commodity page: https://item.jd.com/23934388891.html, and the consumption time: 413 ms

The 2018-04-01 21:27:49 [- thread pool - 1-2] [cn. Xpleaf. Spiders. Utils. HttpUtil] [INFO] - (download page: https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm, and the consumption time: 308 ms, agency information: null:null

2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - Parsing the commodity page: https://product.suning.com/0070079092/10017793337.html, and the consumption time: 588 ms

.

On average, it takes 200 to 500 milliseconds to download a product’s web page, depending on the network at the time.

In addition, if you want to actually calculate the time to climb an item, you can calculate it using the data below the log:

The time to download a product page data



The time to obtain price data



The time to get the comment data


On my host (CPU: E5 10 cores, memory: 32GB, 1 VM and 3 VMS enabled respectively), the situation is as follows:



It can be seen that when 3 nodes are used, the time will not be reduced to 1/3 of the original. This is because the problems affecting crawler performance at this time are mainly network problems, with a large number of nodes, threads and network requests.

But the bandwidth is certain, and in the case of no proxy, frequent requests, connection failures will increase, there is also a certain impact on time, if you use random proxy library, the situation will be much better.

However, it is certain that the crawler time can be greatly reduced after the crawler nodes are added by horizontal expansion, which is also the benefit of distributed crawler system.

Anti – anti – crawler strategy in crawler system

In the design of the whole crawler system, the following strategies are mainly used to achieve the purpose of anti-crawler:

Use proxy to access –>IP proxy library, random IP proxy.



Random TOP-LEVEL domain URL access –> URL scheduling system.



Each thread takes a short time to sleep after each crawl.


conclusion

It needs to be noted that this system is based on Java implementation, but personally, the language itself is still not a problem, the core lies in the design and understanding of the whole system.

Writing this article is hoping to share the architecture of such a distributed crawler system to everyone, if you are interested in the source code, you can go to my GitHub to view.


Originally published at: June 6, 2018

Author: Ye Yonghao

This article is from The cloud community partner “Mesozoic Technology”. For related information, you can follow “Mesozoic Technology”.