1: Project background

Search and recommendation are the soul of app content distribution, which is related to whether some high-quality content can be accurately pushed to target users. It involves word segmentation, synonym, stop word, multi-platform, correlation similarity algorithm, fine sorting and so on. So I’m going to talk about some of the things I understand about search from multiple perspectives.

2: search participles

There are many participle components in the ElasticSearch ecosystem. Mainly depends on the business demand, the development of the company’s business. Elasticsearch-analysis-ik, elasticsearch-jieba-plugin (IK is recommended) If your business is growing fast and you need to support multiple languages, take a look at Stanford CoreNLP, which supports Arabic, Chinese, English, French, German, Spanish. However, there is no plugin to access ElaticSearch, so we need to develop compatibility ourselves. It also supports part of speech analysis and emotion analysis. The relevant comparison is as follows:

Component is introduced Using range installation
elasticsearch-analysis-ik Support Chinese, English segmentation, for Chinese segmentation, stop words can be hot update elasticsearch-plugin install Github.com/medcl/elast…
elasticsearch-analysis-dynamic-synonym Synonyms can support dynamic update, support hot update. The official website synonyms do not support hot updates, so it is recommended to use synonym plug-in. The synonym plugin cannot be installed online. You need to compile zip and keep it consistent with the elasticSearch version. elasticsearch-plugin install file:///Users/xxx/workspace/source/elasticsearch-analysis-dynamic-synonym/target/releases/elasticsearch-analysis-dynamic Synonym – 7.2.0. Zip
elasticsearch-jieba-plugin Version 7.2.0 is not supported at the moment. Detailed see:Github.com/sing1ee/ela…
Stanford CoreNLP StanrdNLP is used in the recommendation system, without a deep understanding. Hot segmentation updates are not supported Github.com/godlockin/e…See better file introduction details:Elasticsearch. Cn/article / 634…

In summary, general scenarios recommend analytical-IK and analytical-dynamic-synonym. Stanford CoreNLP is recommended if multiple languages are involved.

3: Pluggable similarity algorithm

Tf-idf for ElasticSearch before version 5.0 and BM25 after version 5.0 Tf-idf determines the relevance of queries by measuring how common a word is locally and how rare it is globally. BM25 is based on TF-IDF, which solves the defect of TF-IDF and makes the function result more relevant to the user’s query.

Image from official website(Click the link on the left to see the official website)BM25 is more stable than TF-IDF. Another good feature of BM25 is that it provides two tunable parameters:

k1 b
This parameter controls how quickly the word frequency result increases in word frequency saturation. The default value is 1.2. The smaller the value is, the faster the saturation changes; the larger the value is, the slower the saturation changes. This parameter controls the effect of field length normalization; 0.0 disables normalization and 1.0 enables full normalization. The default value is 0.75

It is generally recommended to use BM25 for similarity calculation, and the relevant ES Settings are as follows:

PUT /bm25_test
{
  "settings": {
        "number_of_shards" :   1."number_of_replicas" : 1."index": {
            "analysis": {
                "filter": {
                    "cn_synonyms": {
                        "type": "dynamic_synonym"."synonyms_path":"http://ip:port/cn_synonyms.txt"."interval": 30
                    },
                    "en_synonyms": {
                        "type": "dynamic_synonym"."synonyms_path":"http://ip:port/en_synonyms.txt"."interval": 30
                    },
                    "local_synonym" : {
                        "type" : "dynamic_synonym"."synonyms_path" : "synonym.txt"
                    },
                    "en_stopwords": {
                        "type": "stop"."stopwords_path": "/etc/elasticsearch/en_stopwords.txt"}},"analyzer": {
                    "cn_fenci": {
                        "filter": [
                            "cn_synonyms"."lowercase"]."char_filter": [
                            "html_strip"."&_to_and"]."type": "custom"."tokenizer": "ik_max_word"
                    },
                    "en_fenci": {
                        "filter": [
                            "en_synonyms"."lowercase"."en_stopwords"]."char_filter": [
                            "html_strip"."&_to_and"]."type": "custom"."tokenizer": "standard"}},"char_filter": {
                    "&_to_and": {
                        "type": "mapping"."mappings": [
                            "&=>and"}}}}},"mappings": {
      "properties": {
        "en_word": {
          "type": "text"."similarity": "BM25"."analyzer": "en_fenci"
        },
        "cn_word": {
          "type": "text"."similarity": "BM25"."analyzer": "cn_fenci"}}}}Copy the code

4: How does Flink update in real time

1: Initializes the ElaticSearch Sink and updates data in batches in blUK load mode.

public static HttpHost[] loadHostArray(String nodes) {
        if (httpHostArray == null) {
            String[] split = nodes.split(",");
            httpHostArray = new HttpHost[split.length];

            for(int i = 0; i < split.length; ++i) {
                String item = split[i];
                httpHostArray[i] = new HttpHost(item.split(":") [0], Integer.parseInt(item.split(":") [1]), "http"); }}return httpHostArray;
    }


 HttpHost[] httpHosts = this.loadHostArray(esNodes);
 ElasticsearchSink.Builder<Doc> esSinkBuilder = new ElasticsearchSink.Builder<>(Arrays.asList(httpHosts), new DocSinkES());

        esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
            @Override
            public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
                if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                    // full queue; re-add document for indexing
                    indexer.add(action);
                } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
                    // malformed document; simply drop request without failing sink
                } else {
                    // for all other failures, fail the sink
                    // here the failure is simply rethrown, but users can also choose to throw custom exceptionsLOG.error(failure.getMessage()); }}}); esSinkBuilder.setBulkFlushInterval(3000);
        esSinkBuilder.setBulkFlushMaxSizeMb(10);
        esSinkBuilder.setBulkFlushBackoff(true);
        esSinkBuilder.setBulkFlushBackoffRetries(2);
        esSink = esSinkBuilder.build();
        esSink.disableFlushOnCheckpoint();
Copy the code

2: Our current scenario only supports single table synchronization or multiple sub-database sub-table data synchronization. You can design the relevant configuration to be generic, such as es tablename, doc_id, dbname, tablename, synchronized database fields, filtered fields, etc.

SingleOutputStreamOperator<Tuple2<String, Binlog>> filterStream = binlogDataStream.process(new BinlogFilterProcessFunction(new HashSet<>(searchSyncInfos.keySet())));
SingleOutputStreamOperator<Doc> docStream = filterStream.process(new BinlogToDocProcessFunction(searchSyncInfos));
docStream.addSink(esSink);
Copy the code

5:

Elaticsearch can cover all scenarios for small and medium sized companies. If the scenario is complex, it is recommended to think about the significance and value of doing so from a business perspective. Instead of rushing to help the business realize, think back to the business input-output ratio. Strike a balance between technology and business scenarios.