1: Project background

The app channel page needs to be optimized for content distribution, so that some high-quality content can get more exposure. Channel pages involve certain orientation (content of the same channel page can only appear in this category), and the volume of each channel page is not very large. After careful consideration, it is not recommended to do content filtering through the recommendation system and then do distribution.

2. Technical scheme research

I have some understanding of ElasticSearch funciton_score before, but not very deep (Ruan Yiming es share). You can use function_score to pass in groovy scripts and score dynamically. The correlation function is then tested on the cluster and the correlation latitude is initially discussed with the product. By adjusting the scoring weight of ES through Script_score, the answer is yes.

POST es_score_test/_search
{
  "_source": ["category_id"."parent_nature_id"."uid"."gmt_create"."second_nature_id"]."from": 0."size": 100."query": {
    "function_score": {
      "query": {
        "match": {
          "parent_nature_id": "1"}},"functions": [{"gauss": {
            "gmt_create": {
              "origin": "2019-03-01T14:10:30Z"."scale": "6d"."offset": "1d"}}}, {"field_value_factor": {
            "field": "click_cout"."modifier": "log1p"."factor": 0.1}}, {"script_score": {
            "script": {
              "params": {
                        "threshold": 5."discount": 0.8."target":102
                    },
              "source": "double sortScore=0; double price =doc['second_nature_id'].value; double margin = 10; if (price < params.threshold) { sortScore = price * margin / params.target }else {sortScore = price * (1 - params.discount) * margin / params.target} return sortScore"}}}, {"script_score": {
            "script": {
              "params": {
                        "threshold": 5."discount": 0.8."target":10
                    },
              "source": "double sortScore=0; double price =doc['second_nature_id'].value; double margin = 10000; if (price < params.threshold) { sortScore = price * margin / params.target }else {sortScore = price * (1 - params.discount) * margin / params.target} return sortScore"}}}]."boost_mode": "sum"}}}Copy the code

3: What latitude should we do by?

Calculate the factor The weight Related test code
Shelf time It is suggested to give priority to the new videos by slant.
Views, favorites, dubbing of the latest week/month (log2) You can assign different weights to different behaviors See the code below for details
Hd, membership, manual processing, etc For different factor downgrades
User preference for secondary tags (access history) Construct the user’s historical behavior through the portrait system
POST es_score_test/_search
{
"query": {
    "function_score": {
        "query": {
            "match": {
                "parent_nature_id": "356"}},"functions": [{    
        "script_score": {
            "script": "Math.log(doc['click_cout'].value *1 + doc['collect_cout'].value *2 +doc['dub_cout'].value *3)"}}]."boost_mode": "sum"}}}Copy the code

4: What should we do specifically

1: Design of video table ES

1: Video does not have a one-to-one relationship with first-level and second-level categories (many-to-many). If all data are even, there will be multiple lines in a video, so only one video ID must exist. After technical research, the elasticSearch Array type was introduced to integrate the categories corresponding to the same video. (Worry about performance at the beginning of doing, subsequent pressure test seems to be overthinking)

Official website: HTTPS://www.elastic.co/guide/en/elasticsearch/reference/current/array.html

In Elasticsearch, there is no dedicated array data type. Any field can contain zero or more values by default, however, all values in the array must be of the same data type. For instance:

an array of strings: [ "one"."two" ]
an array of integers: [ 1.2 ]
an array of arrays: [ 1[2.3 ]] which is the equivalent of1.2.3 ]
an array of objects: [ { "name": "Mary"."age": 12 }, { "name": "John"."age": 10 }]
Copy the code

2: data update policy

1. The overall construction is based on the big data platform. Update all behaviors in real time to ensure real-time retrieval. The overall architecture diagram is as follows:

2: The relationship between videos and first-level categories and second-level categories is basically fixed, which will not involve a lot of changes. We can use offline data maintenance to hbase. Our big data platform is built based on CDH6.1. Hbase and Datanode are mixed. If the CPU or memory usage of yarn tasks is high, the hbase response is slow and the C-end service response times out. Alibase is introduced by distcp across the cluster to the new cluster and then Bulkload to Alihbase.

hbaseFileDir=$1
tableName=$2
#hadoop2 version
date=$(date +%Y-%m-%d-%H-%M)
hadoop distcp -D dfs.replication=2 /data/hbase/${hbaseFileDir} hdfs://xxx.hbaseue.rds.aliyuncs.com:8020/data/hfile/ >> /home/bigdata/data/logs/hadoop-${date}.log 2>&1
num=$(echo $?)
if [ $num -ne 0]; then echo $num exit1
fi
/home/bigdata/data/alihbase-2.1. 0/bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /data/hfile/${hbaseFileDir} "${tableName}" >> /home/bigdata/data/logs/hbase-${date}.log 2> &1
num2=$(echo $?)
if [ $num2 -ne 0]; then echo $num2 exit1
fi
Copy the code

3: Real-time data update. Maxwell collects binlogs, Flink consumes binlogs, and updates data to ElasticSerach in real time. The whole is based on the portrait system construction, configuration of the relevant data. (I will comb the portrait system as a whole and add it here.)

3: performance pressure measurement

The related functions meet service requirements. Perform a routine pressure test before release. – 100qps – ElasticSearch load is very high. Finally decided to delay the release, focus on follow-up optimization.

1: Performance optimization based on ES level. Parameters you can consult, according to the need to add dynamic.

 {
  "es_score_test" : {
    "settings" : {
      "index" : {
        "search" : {
          "slowlog" : {
            "level" : "info"}},"refresh_interval" : "120s"."indexing" : {
          "slowlog" : {
            "level" : "debug"."threshold" : {
              "index" : {
                "warn" : "1000ms"."trace" : "800ms"."debug" : "800ms"."info" : "800ms"}},"source" : "1000"}},"number_of_shards" : "9"."translog" : {
          "flush_threshold_size" : "1024mb"."sync_interval" : "120s"."durability" : "async"
        },
        "provided_name" : "es_score_test"."max_result_window" : "1000000"."creation_date" : "1609815684065"."number_of_replicas" : "1"."uuid" : "ZMtszdE9SLqhUpmXgO-vfA"."version" : {
          "created" : "7040299"}}}}} number_of_replicas -> Number of replicas refresh_interval -> Index refresh time frequency -> Disk synchronization policy ayNC is asynchronous curl -XPUT -H"Content-Type:application/json" http://ip:9200/es_score_test/_settings -d '{ "index" : { "number_of_replicas" : "0" }}'

curl -XPUT  -H "Content-Type:application/json" http://ip:9200/es_score_test/_settings -d '{ "index" : { "refresh_interval" : "60s" }}'

curl -XPUT  -H "Content-Type:application/json" http://ip:9200/es_score_test/_settings -d '{ "index" : { "translog": { "durability" : "async" }}}'
Copy the code

2: Groovy script dynamic computation is CPU intensive, resulting in high elasticSearch cluster stress.

1. We decided to calculate part of the number by offline calculation of the result score (click, expose, favorites) and other user behaviors.

Spark SQL is used to calculate all points that can be calculated offline into a field. So the fixed score can reduce the computation.

2: Performance of ElasticSearch pages is abnormal. Use Scroll to solve the problem.

SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(INDEX_NAME);
// From-size elasticSearch will fetch all data and then go to target data. So overall through scrollId do deep page turning.SCROLL_TIME_LIMIT The time I set for this item is3Minutes. The value can be determined based on the user usage duration. Searchrequest.scroll (new Scroll(TimeValue.timeValueSeconds(SCROLL_TIME_LIMIT)));

// Why QUERY_THEN_FETCH is used
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);

// The default value is 512
searchRequest.setBatchedReduceSize(BATCHED_REDUCE_SIZE);

// Configure multiple shards to improve parallelism
searchRequest.setMaxConcurrentShardRequests(Constants.NUMBER_9);
// Scroll batch times each time
searchSourceBuilder.size(num);

Copy the code

3: remove all the groovy scripts, put all the weight calculated by FunctionScoreQueryBuilder set weight calculation.

// The default field sort score
FieldValueFactorFunctionBuilder fieldValueFactorFunctionBuilder = new FieldValueFactorFunctionBuilder(scoreType);
FunctionScoreQueryBuilder.FilterFunctionBuilder fieldValueFactorFilter = new FunctionScoreQueryBuilder.FilterFunctionBuilder(fieldValueFactorFunctionBuilder);
filterFunctionBuilderList.add(fieldValueFactorFilter); 


/** * processing includes filtering **@param score
    * @param tagName
    * @param data
    * @param isTerms
    * @return* /
 private FunctionScoreQueryBuilder.FilterFunctionBuilder getContainsFilter(Integer score, String tagName, Object data, boolean isTerms) {
        ScoreFunctionBuilder<WeightBuilder> containsScoreFunctionBuilder = new WeightBuilder();
        containsScoreFunctionBuilder.setWeight(score);
        FunctionScoreQueryBuilder.FilterFunctionBuilder containsFilter = null;
        if (isTerms) {
            containsFilter = new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.termsQuery(tagName, (int[]) data), containsScoreFunctionBuilder);
        } else {
            containsFilter = new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.termQuery(tagName, data), containsScoreFunctionBuilder);
        }
        return containsFilter;
    }


 /** * Processing does not include filtering **@param score
     * @param tagName
     * @param data
     * @param isTerms
     * @return* /
    private FunctionScoreQueryBuilder.FilterFunctionBuilder getNotContainsFilter(Integer score, String tagName, Object data, boolean isTerms) {
        ScoreFunctionBuilder<WeightBuilder> notContainsScoreFunctionBuilder = new WeightBuilder();
        notContainsScoreFunctionBuilder.setWeight(score);
        FunctionScoreQueryBuilder.FilterFunctionBuilder notConstantsFilter = null;
        if (isTerms) {
            notConstantsFilter = new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery(tagName, (int[]) data)), notContainsScoreFunctionBuilder);
        } else {
            notConstantsFilter = new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(tagName, data)), notContainsScoreFunctionBuilder);
        }
        return notConstantsFilter;
    }
Copy the code

4: Creating too many scollid will cause performance problems, so don’t set the scrollid time too long. So how can we avoid the problem that the next time the user visits, the distribution will start from the first. Introduce min_score so that the minscore is added on the next request to avoid scrolling from 0. Related min_score, see: www.elastic.co/guide/en/el… Since ElasticSearch only supports min_score filtering, we need to calculate the maximum score (min_score cannot be negative) and reverse sort using a maximum. Then store the last MIN_score in Redis or hbase and set the TTL. In this way, you can ensure that the same period of time will not be repeated.

5: online pressure measurement & gray scale

1: online pressure test

After the above optimization, RT is within 50ms. It largely meets the needs of the business. For channel pages, popular can use a similar scheme.

2: online gray scale

Crc32 is used as gray scale, and udF function is added to offline data warehouse. Relevant data analysis can be completed. The whole sample was divided into 10 samples and analyzed through data. Confirm if additional samples can be added.

 /** * Calculate the device crc32 value **@param deviceId
     * @return* /
    protected Integer getDeviceCrc32Number(String deviceId) {
        if (StringUtils.isBlank(deviceId)) {
            return null;
        }
        CRC32 crc32 = new CRC32();
        crc32.update(deviceId.getBytes());
        Long value = (crc32.getValue()) % 10;
        Integer result = value.intValue();
        return result;
    }

/** * udF function development, upload related JAR to HDFS * according to the device number */
public final class Crc32UDF extends UDF{


    / * * *@author tangpt
     * @param eDeviceId
     * @return* /
    public   long  evaluate(String eDeviceId){
        if(null ==  eDeviceId  ||  eDeviceId.length() == 0) {return 0l;
        }
        CRC32 crc32 =new  CRC32();
        crc32.update(eDeviceId.getBytes());
        returncrc32.getValue(); }}Copy the code

6:

The above made some business innovations for function_score, and the channel page click also made a great breakthrough. If you have similar scenarios, welcome to exchange technical solutions together.