1. Visitors display details

The first step is to identify the jump behavior by identifying the last page the jump visitor visited. Then we need to grasp a few characteristics:

This page is the first page recently visited by the user. You can check whether this page has a previous page (last_page_id). If this page is empty, this page is the first page visited by the user.

A long time after the initial visit (self-defined), the user does not continue to visit other pages.

Select last_Page_id from last_page_id. Select last_page_id from last_page_id. But the second access judgment, it’s a little bit tricky, first of all it’s not just one piece of data, it’s a combination judgment, it’s a combination of data that exists and data that doesn’t exist. And you have to get a piece of data that does exist from a piece of data that doesn’t exist. The more troubling thing is not that he doesn’t exist forever, but that he doesn’t exist for a certain period of time. So how do you identify the combination behavior that has some failure?

The simplest solution is Flink’s built-in CEP technology. This CEP is ideal for identifying an event through multiple combinations of data.

User exit events are essentially a combination of a conditional event and a timeout event.

  • The flow chart

2. Code implementation

Create the task class UserJumpDetailApp.java to read the page logs from Kafka

import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

/ * * *@author zhangbao
 * @date2021/10/17 but *@desc* /
public class UserJumpDetailApp {
    public static void main(String[] args) {
        // In webui mode, poM dependencies need to be added
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        // Set the parallelism
        env.setParallelism(4);
        // Set checkpoints
// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
// // Specifies the user that reads HDFS files
// System.setProperty("HADOOP_USER_NAME","zhangbao");

        // Read data sources from kafka
        String sourceTopic = "dwd_page_log";
        String group = "user_jump_detail_app_group";
        String sinkTopic = "dwm_user_jump_detail";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
        DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);

        kafkaDs.print("user jump detail >>>");

        try {
            env.execute("user jump detail task");
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

3. Flink CEP programming

The official document: nightlies.apache.org/flink/flink…

Processing flow

1. Read log data from Kafka

2. Set the time semantics to event time and specify the event time field TS

3. Group by MID

4. Configure the CEP expression

  • 1. First accessed page: last_page_id == null

  • 2. No other operations are performed on the first page within 10 seconds

5. Filter streams based on expressions

6. Retrieve the hit data

  • TimeoutTag specifies the timeoutTag
  • FlatSelect method, realize the timeout PatternFlatTimeoutFunction method.
  • All out.collect data is marked with a timeout
  • The flatSelect method itself does not accept data because it does not need untimed data.
  • Output timeout data through SideOutput stream

7. Write the jump data back to Kafka

package com.zhangbao.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

/ * * *@author zhangbao
 * @date2021/10/17 but *@desc* /
public class UserJumpDetailApp {
    public static void main(String[] args) {
        // In webui mode, poM dependencies need to be added
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        // Set the parallelism
        env.setParallelism(4);
        // Set checkpoints
// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
// // Specifies the user that reads HDFS files
// System.setProperty("HADOOP_USER_NAME","zhangbao");

        // Read data sources from kafka
        String sourceTopic = "dwd_page_log";
        String group = "user_jump_detail_app_group";
        String sinkTopic = "dwm_user_jump_detail";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
        DataStreamSource<String> jsonStrDs = env.addSource(kafkaSource);

        /*// DataStream
      
        jsonStrDs = env.fromelements ( "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" + "\"home\"},\"ts\":15000} ", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" + "\"detail\"},\"ts\":30000} " ); dataStream.print("in json:"); * /
      

        // Convert the structure of the read data
        SingleOutputStreamOperator<JSONObject> jsonObjDs = jsonStrDs.map(jsonStr -> JSON.parseObject(jsonStr));

// jsonStrDs.print("user jump detail >>>");
        // As of Flink1.12, the time semantics default is the event time and do not need to be specified. For previous versions, the event time semantics were specified as follows
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Specify the event time field
        SingleOutputStreamOperator<JSONObject> jsonObjWithTSDs = jsonObjDs.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(
                        new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject jsonObject, long l) {
                                return jsonObject.getLong("ts"); }}));// Group by mid
        KeyedStream<JSONObject, String> ketByDs = jsonObjWithTSDs.keyBy(
                jsonObject -> jsonObject.getJSONObject("common").getString("mid"));/** * flink CEP expression * jump out of the rule, satisfy two conditions: * 1. Last_page_id == null * 2 The first page is visited within 10 seconds, no other action is taken, no other page is visited */
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first")
                .where( Last_page_id == null
                    new SimpleCondition<JSONObject>() {
                        @Override
                        public boolean filter(JSONObject jsonObject) throws Exception {
                            String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                            System.out.println("first page >>> "+lastPageId);
                            if (lastPageId == null || lastPageId.length() == 0) {
                                return true;
                            }
                            return false;
                        }
                    }
                ).next("next")
                .where( //2. No other operations are performed on the first page within 10 seconds
                        new SimpleCondition<JSONObject>() {
                            @Override
                            public boolean filter(JSONObject jsonObject) throws Exception {
                                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                                System.out.println("next page >>> "+pageId);
                                if(pageId ! =null && pageId.length()>0) {return true;
                                }
                                return false; }}// Time limit mode, 10S
                ).within(Time.milliseconds(10000));

        // Apply the CEP expression to the stream to filter data
        PatternStream<JSONObject> patternStream = CEP.pattern(ketByDs, pattern);

        // Extract the timeout data from the filtered data and put it into the side output stream
        OutputTag<String> timeOutTag = new OutputTag<String>("timeOut") {}; SingleOutputStreamOperator<Object> outputStreamDS = patternStream.flatSelect( timeOutTag,// Get timeout data
                new PatternFlatTimeoutFunction<JSONObject, String>() {
                    @Override
                    public void timeout(Map<String, List<JSONObject>> map, long l, Collector<String> collector) throws Exception {
                        List<JSONObject> first = map.get("first");
                        for (JSONObject jsonObject : first) {
                            System.out.println("time out date >>> "+jsonObject.toJSONString());
                            // All out.collect data is marked with a timeoutcollector.collect(jsonObject.toJSONString()); }}},// Get untimed data
                new PatternFlatSelectFunction<JSONObject, Object>() {
                    @Override
                    public void flatSelect(Map<String, List<JSONObject>> map, Collector<Object> collector) throws Exception {
                        // Data that does not time out is not extracted, so no operation is done here}});// Get the timeout data of the side output stream
        DataStream<String> timeOutDs = outputStreamDS.getSideOutput(timeOutTag);
        timeOutDs.print("jump >>> ");
        
        // Write the jump data back to Kafka
        timeOutDs.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));

        try {
            env.execute("user jump detail task");
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

The test data

Change the way data is read from Kafka to fixed data content as follows:

// Test data
        DataStream<String> jsonStrDs = env
         .fromElements(
                "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} "."{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}"."{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                        "\"home\"},\"ts\":15000} "."{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                        "\"detail\"},\"ts\":30000} "
        );
        dataStream.print("in json:");
Copy the code

The data is then consumed from the DWM_user_JUMp_detail topic

./kafka-console-consumer.sh --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic dwm_user_jump_detail