1. Visitors display details
The first step is to identify the jump behavior by identifying the last page the jump visitor visited. Then we need to grasp a few characteristics:
This page is the first page recently visited by the user. You can check whether this page has a previous page (last_page_id). If this page is empty, this page is the first page visited by the user.
A long time after the initial visit (self-defined), the user does not continue to visit other pages.
Select last_Page_id from last_page_id. Select last_page_id from last_page_id. But the second access judgment, it’s a little bit tricky, first of all it’s not just one piece of data, it’s a combination judgment, it’s a combination of data that exists and data that doesn’t exist. And you have to get a piece of data that does exist from a piece of data that doesn’t exist. The more troubling thing is not that he doesn’t exist forever, but that he doesn’t exist for a certain period of time. So how do you identify the combination behavior that has some failure?
The simplest solution is Flink’s built-in CEP technology. This CEP is ideal for identifying an event through multiple combinations of data.
User exit events are essentially a combination of a conditional event and a timeout event.
- The flow chart
2. Code implementation
Create the task class UserJumpDetailApp.java to read the page logs from Kafka
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/ * * *@author zhangbao
* @date2021/10/17 but *@desc* /
public class UserJumpDetailApp {
public static void main(String[] args) {
// In webui mode, poM dependencies need to be added
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
// Set the parallelism
env.setParallelism(4);
// Set checkpoints
// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
// // Specifies the user that reads HDFS files
// System.setProperty("HADOOP_USER_NAME","zhangbao");
// Read data sources from kafka
String sourceTopic = "dwd_page_log";
String group = "user_jump_detail_app_group";
String sinkTopic = "dwm_user_jump_detail";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
kafkaDs.print("user jump detail >>>");
try {
env.execute("user jump detail task");
} catch(Exception e) { e.printStackTrace(); }}}Copy the code
3. Flink CEP programming
The official document: nightlies.apache.org/flink/flink…
Processing flow
1. Read log data from Kafka
2. Set the time semantics to event time and specify the event time field TS
3. Group by MID
4. Configure the CEP expression
-
1. First accessed page: last_page_id == null
-
2. No other operations are performed on the first page within 10 seconds
5. Filter streams based on expressions
6. Retrieve the hit data
- TimeoutTag specifies the timeoutTag
- FlatSelect method, realize the timeout PatternFlatTimeoutFunction method.
- All out.collect data is marked with a timeout
- The flatSelect method itself does not accept data because it does not need untimed data.
- Output timeout data through SideOutput stream
7. Write the jump data back to Kafka
package com.zhangbao.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
/ * * *@author zhangbao
* @date2021/10/17 but *@desc* /
public class UserJumpDetailApp {
public static void main(String[] args) {
// In webui mode, poM dependencies need to be added
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
// Set the parallelism
env.setParallelism(4);
// Set checkpoints
// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
// // Specifies the user that reads HDFS files
// System.setProperty("HADOOP_USER_NAME","zhangbao");
// Read data sources from kafka
String sourceTopic = "dwd_page_log";
String group = "user_jump_detail_app_group";
String sinkTopic = "dwm_user_jump_detail";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
DataStreamSource<String> jsonStrDs = env.addSource(kafkaSource);
/*// DataStream
jsonStrDs = env.fromelements ( "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" + "\"home\"},\"ts\":15000} ", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" + "\"detail\"},\"ts\":30000} " ); dataStream.print("in json:"); * /
// Convert the structure of the read data
SingleOutputStreamOperator<JSONObject> jsonObjDs = jsonStrDs.map(jsonStr -> JSON.parseObject(jsonStr));
// jsonStrDs.print("user jump detail >>>");
// As of Flink1.12, the time semantics default is the event time and do not need to be specified. For previous versions, the event time semantics were specified as follows
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Specify the event time field
SingleOutputStreamOperator<JSONObject> jsonObjWithTSDs = jsonObjDs.assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("ts"); }}));// Group by mid
KeyedStream<JSONObject, String> ketByDs = jsonObjWithTSDs.keyBy(
jsonObject -> jsonObject.getJSONObject("common").getString("mid"));/** * flink CEP expression * jump out of the rule, satisfy two conditions: * 1. Last_page_id == null * 2 The first page is visited within 10 seconds, no other action is taken, no other page is visited */
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first")
.where( Last_page_id == null
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
System.out.println("first page >>> "+lastPageId);
if (lastPageId == null || lastPageId.length() == 0) {
return true;
}
return false;
}
}
).next("next")
.where( //2. No other operations are performed on the first page within 10 seconds
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
String pageId = jsonObject.getJSONObject("page").getString("page_id");
System.out.println("next page >>> "+pageId);
if(pageId ! =null && pageId.length()>0) {return true;
}
return false; }}// Time limit mode, 10S
).within(Time.milliseconds(10000));
// Apply the CEP expression to the stream to filter data
PatternStream<JSONObject> patternStream = CEP.pattern(ketByDs, pattern);
// Extract the timeout data from the filtered data and put it into the side output stream
OutputTag<String> timeOutTag = new OutputTag<String>("timeOut") {}; SingleOutputStreamOperator<Object> outputStreamDS = patternStream.flatSelect( timeOutTag,// Get timeout data
new PatternFlatTimeoutFunction<JSONObject, String>() {
@Override
public void timeout(Map<String, List<JSONObject>> map, long l, Collector<String> collector) throws Exception {
List<JSONObject> first = map.get("first");
for (JSONObject jsonObject : first) {
System.out.println("time out date >>> "+jsonObject.toJSONString());
// All out.collect data is marked with a timeoutcollector.collect(jsonObject.toJSONString()); }}},// Get untimed data
new PatternFlatSelectFunction<JSONObject, Object>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> map, Collector<Object> collector) throws Exception {
// Data that does not time out is not extracted, so no operation is done here}});// Get the timeout data of the side output stream
DataStream<String> timeOutDs = outputStreamDS.getSideOutput(timeOutTag);
timeOutDs.print("jump >>> ");
// Write the jump data back to Kafka
timeOutDs.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
try {
env.execute("user jump detail task");
} catch(Exception e) { e.printStackTrace(); }}}Copy the code
The test data
Change the way data is read from Kafka to fixed data content as follows:
// Test data
DataStream<String> jsonStrDs = env
.fromElements(
"{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} "."{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}"."{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"home\"},\"ts\":15000} "."{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"detail\"},\"ts\":30000} "
);
dataStream.print("in json:");
Copy the code
The data is then consumed from the DWM_user_JUMp_detail topic
./kafka-console-consumer.sh --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic dwm_user_jump_detail