Recently, due to business needs to monitor some data, although there are many excellent crawler frameworks on the market, I still plan to implement a complete set of crawler frameworks from scratch.

In terms of technology selection, I chose the lighter vert. x instead of Spring to build the project. On the one hand, Spring is too heavy, and vert. x is a JVM-based, lightweight, high-performance framework. It is event-based and asynchronous, relies on the all-asynchronous Java server Netty, and extends many other features.

Making address: https://github.com/fengzhizi715/NetDiscovery

I. Function of crawler framework

The crawler framework includes a SpiderEngine and a Spider. SpiderEngine can manage multiple spiders.

1.1 spiders

The main components of Spider are Downloader, Queue, Parser, Pipeline, and Proxypool. The proxypool is a separate project THAT I wrote about a while ago, and I introduced it because it is often necessary to switch proxy IP when using the crawler framework.

Proxypool address: https://github.com/fengzhizi715/ProxyPool

The other four components are interfaces, and some implementations are built into the crawler framework, such as multiple built-in downloaders, including WebClient of VerTX, HTTP Client, OKHttp3, and Selenium. Developers can choose to use or develop new downloaders on their own.

The Download method of Downloader returns a Maybe.

package com.cv4j.netdiscovery.core.downloader;

import com.cv4j.netdiscovery.core.domain.Request;
import com.cv4j.netdiscovery.core.domain.Response;
import io.reactivex.Maybe;

/** * Created by tony on 2017/12/23. */
public interface Downloader {

    Maybe<Response> download(Request request);

    void close(a);
}
Copy the code

In Spider, through the Maybe object to achieve a series of subsequent chain calls, such as the Response into the Page object, and then the Page object for parsing, Page after the completion of a series of pipeline operations.

                  downloader.download(request)
                            .observeOn(Schedulers.io())
                            .map(new Function<Response, Page>() {

                                @Override
                                public Page apply(Response response) throws Exception {

                                    Page page = new Page();
                                    page.setHtml(new Html(response.getContent()));
                                    page.setRequest(request);
                                    page.setUrl(request.getUrl());
                                    page.setStatusCode(response.getStatusCode());

                                    return page;
                                }
                            })
                            .map(new Function<Page, Page>() {

                                @Override
                                public Page apply(Page page) throws Exception {

                                    if(parser ! =null) {

                                        parser.process(page);
                                    }

                                    return page;
                                }
                            })
                            .map(new Function<Page, Page>() {

                                @Override
                                public Page apply(Page page) throws Exception {

                                    if (Preconditions.isNotBlank(pipelines)) {

                                        pipelines.stream()
                                                .forEach(pipeline -> pipeline.process(page.getResultItems()));
                                    }

                                    return page;
                                }
                            })
                            .subscribe(new Consumer<Page>() {

                                @Override
                                public void accept(Page page) throws Exception {

                                    log.info(page.getUrl());

                                    if(request.getAfterRequest()! =null) { request.getAfterRequest().process(page); }}},new Consumer<Throwable>() {
                                @Override
                                public void accept(Throwable throwable) throws Exception { log.error(throwable.getMessage()); }});Copy the code

Using RxJava 2 here makes the entire crawler framework look more responsive 🙂

1.2 SpiderEngine

SpiderEngine can contain multiple spiders. You can add a crawler to the SpiderEngine by addSpider(), createSpider(), and create a new Spider to add to the SpiderEngine.

In SpiderEngine, individual spiders in the SpiderEngine can also be monitored if the HTTPD (port) method is called.

1.2.1 Obtaining the state of a crawler

http://localhost:{port}/netdiscovery/spider/{spiderName}

Type: the GET

1.2.2 Getting the state of all SpiderEngine crawlers

http://localhost:{port}/netdiscovery/spiders/

Type: the GET

1.2.3 Modifying the state of a crawler

http://localhost:{port}/netdiscovery/spider/{spiderName}/status

Type: POST

Parameter Description:

{
    "status":2   // Pause the crawler
}
Copy the code
status role
2 Pause the crawler
3 Allows the crawler to recover from pause
4 Stop the crawler

Examples of using frameworks

Create a SpiderEngine, and then create three spiders, each crawling a page at regular intervals.

        SpiderEngine engine = SpiderEngine.create();

        Spider spider = Spider.create()
                .name("tony1")
                .repeatRequest(10000."http://www.163.com")
                .initialDelay(10000);

        engine.addSpider(spider);

        Spider spider2 = Spider.create()
                .name("tony2")
                .repeatRequest(10000."http://www.baidu.com")
                .initialDelay(10000);

        engine.addSpider(spider2);

        Spider spider3 = Spider.create()
                .name("tony3")
                .repeatRequest(10000."http://www.126.com")
                .initialDelay(10000);

        engine.addSpider(spider3);

        engine.httpd(8080);
        engine.run();
Copy the code

After a period of time, the program is run in the browser input: http://localhost:8080/netdiscovery/spiders

We can see the results of three crawlers running.

Format the JSON

{
	"code": 200."data": [{
		"downloaderType": "VertxDownloader"."leftRequestSize": 0."queueType": "DefaultQueue"."spiderName": "tony2"."spiderStatus": 1."totalRequestSize": 7
	}, {
		"downloaderType": "VertxDownloader"."leftRequestSize": 0."queueType": "DefaultQueue"."spiderName": "tony3"."spiderStatus": 1."totalRequestSize": 7
	}, {
		"downloaderType": "VertxDownloader"."leftRequestSize": 0."queueType": "DefaultQueue"."spiderName": "tony1"."spiderStatus": 1."totalRequestSize": 7}]."message": "success"
}
Copy the code

case

Recently, I pay more attention to blockchain, so I made a program to capture the price of three digital currencies in real time. I can obtain the latest price by “asking” the public account.

At present, this program has been online, you can get the latest price of these digital currencies in real time by asking my public account.

TODO

  1. Added the identification of the login verification code
  2. Added support for ElasticSearch

conclusion

The crawler framework is just getting started, and I’ve looked at a lot of good crawler frameworks. In the future, I will consider adding screen shots to analyze data in images in the framework. It will even combine with the CV4J framework. Before the New Year, the recognition of login verification code will be prioritized in the crawler framework.