1. In this paper,
The log data we collected previously is stored in Kafka as the ODS layer of log data. The log data read from Kafka’s ODS layer is divided into three types: page logs, startup logs, and exposure logs. Although these three types of data are all user behavior data, they have completely different data structures, so they need to be split and processed. Write the split logs back to the different Kafka topics as the log DWD layer.
Page logs are output to the main stream, startup logs to the startup side output stream, and exposure logs to the exposure side output stream
2. Identify new and existing users
The client service has the identification of new and old users, but it is not accurate enough. Real-time calculation is needed to confirm the identification again (no service operations are involved, but only status confirmation is performed).
Data split is realized by side output stream
According to the log data content, the log data is divided into three types: page logs, startup logs, and exposure logs. Push data from different streams downstream to different Kafka topics
3. Code implementation
Create flink task baselogtask.java under package app,
Use Flink to consume Kafka data, and then record checkpoint consumption to HDFS. Manually create a path and grant permissions
Checkpoint is optional and can be turned off during testing.
package com.zhangbao.gmall.realtime.app;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/ * * *@author: zhangbao
* @date: 2021/6/18 "*@desc: * * /
public class BaseLogTask {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism, that is, the number of kafka partitions
env.setParallelism(4);
// Add checkpoint every 5 seconds
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
// Specify which user reads HDFS files
System.setProperty("HADOOP_USER_NAME"."zhangbao");
// Add data source
String topic = "ods_base_log";
String group = "base_log_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
// Convert the format
SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
returnJSONObject.parseObject(s); }}); jsonDs.print("json >>> --- ");
try {
/ / execution
env.execute();
} catch(Exception e) { e.printStackTrace(); }}}Copy the code
MyKafkaUtil. Java tools
package com.zhangbao.gmall.realtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/ * * *@author: zhangbao
* @date: 2021/6/18 suffering justly *@desc: * * /
public class MyKafkaUtil {
private static String kafka_host = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
/** * Kafka consumer */
public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String group){
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_host);
return new FlinkKafkaConsumer<String>(topic, newSimpleStringSchema(),props); }}Copy the code
4. State repair of new and old visitors
Identify new and old customer rules
Identification of new and old visitor, front will be recorded on the new and old customer state, may not, here to confirm again, save the mid day state conditions (as of the date of the first visit to save state), such as the equipment in a log, date and log date was obtained from the state of comparison, if the status is not empty, and state the date and the current date is not equal, that is the old visitor, If the is_new flag is 1, its state is repaired.
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.SimpleDateFormat;
import java.util.Date;
/ * * *@author: zhangbao
* @date: 2021/6/18 "*@desc: * * /
public class BaseLogTask {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism, that is, the number of kafka partitions
env.setParallelism(4);
// Add checkpoint every 5 seconds
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
// Specify which user reads HDFS files
System.setProperty("HADOOP_USER_NAME"."zhangbao");
// Add data source to kafka
String topic = "ods_base_log";
String group = "base_log_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
// Convert the format
SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
returnJSONObject.parseObject(s); }}); jsonDs.print("json >>> --- ");
/** * if the state is not empty, the state is not empty. If the state is not empty, the state is not empty. If the state is not empty, the state is not empty. And the status date is not equal to the current date, indicating the old visitor, if the is_new flag is 1, the state is fixed */
// Group logs by ID
KeyedStream<JSONObject, String> midKeyedDs = jsonDs.keyBy(data -> data.getJSONObject("common").getString("mid"));
// New and old visitor state repair, state is divided into operator state and keyed state, we record a certain device state, use keyed state is more appropriate
SingleOutputStreamOperator<JSONObject> midWithNewFlagDs = midKeyedDs.map(new RichMapFunction<JSONObject, JSONObject>() {
// Define mid state
private ValueState<String> firstVisitDateState;
// Define date formatting
private SimpleDateFormat sdf;
// Initialize method
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
sdf = new SimpleDateFormat("yyyyMMdd");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
// Get the current mid state
String is_new = jsonObject.getJSONObject("common").getString("is_new");
// Get the current log timestamp
Long ts = jsonObject.getLong("ts");
if ("1".equals(is_new)) {
// Visitor date status
String stateDate = firstVisitDateState.value();
String nowDate = sdf.format(new Date());
if(stateDate ! =null&& stateDate.length() ! =0 && !stateDate.equals(nowDate)) {
/ / is old
is_new = "0";
jsonObject.getJSONObject("common").put("is_new", is_new);
} else {
/ / new visitorsfirstVisitDateState.update(nowDate); }}returnjsonObject; }}); midWithNewFlagDs.print();try {
/ / execution
env.execute();
} catch(Exception e) { e.printStackTrace(); }}}Copy the code
5. Use side output stream to achieve data splitting
After the above new and old customer repair, the log data is divided into three categories
Start log tag definition: OutputTag
startTag = new OutputTag
(“start”){};
OutputTag
displayTag = new OutputTag
(“display”){};
Page logs are output to the main stream, startup logs to the startup output stream, and exposure logs to the exposure log output stream.
The data is split and sent to Kafka
- Dwd_start_log: startup log
- Dwd_display_log: exposure log
- Dwd_page_log: indicates the page log
package com.zhangbao.gmall.realtime.app;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.Date;
/ * * *@author: zhangbao
* @date: 2021/6/18 "*@desc: * * /
public class BaseLogTask {
private static final String TOPIC_START = "dwd_start_log";
private static final String TOPIC_DISPLAY = "dwd_display_log";
private static final String TOPIC_PAGE = "dwd_page_log";
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism, that is, the number of kafka partitions
env.setParallelism(4);
// Add checkpoint every 5 seconds
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
// Specify which user reads HDFS files
System.setProperty("HADOOP_USER_NAME"."zhangbao");
// Add data source to kafka
String topic = "ods_base_log";
String group = "base_log_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
// Convert the format
SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
returnJSONObject.parseObject(s); }}); jsonDs.print("json >>> --- ");
/** * if the state is not empty, the state is not empty. If the state is not empty, the state is not empty. If the state is not empty, the state is not empty. And the status date is not equal to the current date, indicating the old visitor, if the is_new flag is 1, the state is fixed */
// Group logs by ID
KeyedStream<JSONObject, String> midKeyedDs = jsonDs.keyBy(data -> data.getJSONObject("common").getString("mid"));
// New and old visitor state repair, state is divided into operator state and keyed state, we record a certain device state, use keyed state is more appropriate
SingleOutputStreamOperator<JSONObject> midWithNewFlagDs = midKeyedDs.map(new RichMapFunction<JSONObject, JSONObject>() {
// Define mid state
private ValueState<String> firstVisitDateState;
// Define date formatting
private SimpleDateFormat sdf;
// Initialize method
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
sdf = new SimpleDateFormat("yyyyMMdd");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
// Get the current mid state
String is_new = jsonObject.getJSONObject("common").getString("is_new");
// Get the current log timestamp
Long ts = jsonObject.getLong("ts");
if ("1".equals(is_new)) {
// Visitor date status
String stateDate = firstVisitDateState.value();
String nowDate = sdf.format(new Date());
if(stateDate ! =null&& stateDate.length() ! =0 && !stateDate.equals(nowDate)) {
/ / is old
is_new = "0";
jsonObject.getJSONObject("common").put("is_new", is_new);
} else {
/ / new visitorsfirstVisitDateState.update(nowDate); }}returnjsonObject; }});// midWithNewFlagDs.print();
/** * According to the log data content, the log data can be divided into three types: page log, startup log and exposure log. Page log * output to main stream, start log output to start side output stream, exposure log output to exposure log side output stream * side output stream: 1 receive late data, 2 split */
// Define the start side output stream label with parentheses to generate the corresponding type
OutputTag<String> startTag = new OutputTag<String>("start") {};// Define the exposure side output stream label
OutputTag<String> displayTag = new OutputTag<String>("display") {}; SingleOutputStreamOperator<String> pageDs = midWithNewFlagDs.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
String dataStr = jsonObject.toString();
JSONObject startJson = jsonObject.getJSONObject("start");
// Determine whether to start logging
if(startJson ! =null && startJson.size() > 0) {
context.output(startTag, dataStr);
} else {
// Determine whether to expose the log
JSONArray jsonArray = jsonObject.getJSONArray("displays");
if(jsonArray ! =null && jsonArray.size() > 0) {
// Add pageId to each exposure
String pageId = jsonObject.getJSONObject("page").getString("page_id");
// Iterate over the output exposure log
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject disPlayObj = jsonArray.getJSONObject(i);
disPlayObj.put("page_id", pageId); context.output(displayTag, disPlayObj.toString()); }}else {
// If not the exposure log, then the page log, output to the mainstreamcollector.collect(dataStr); }}}});// Get the side output stream
DataStream<String> startDs = pageDs.getSideOutput(startTag);
DataStream<String> disPlayDs = pageDs.getSideOutput(displayTag);
// Print the output
startDs.print("start>>>");
disPlayDs.print("display>>>");
pageDs.print("page>>>");
/** * Sends log data for different streams to the specified kafka topic */
startDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_START));
disPlayDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_DISPLAY));
pageDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_PAGE));
try {
/ / execution
env.execute();
} catch(Exception e) { e.printStackTrace(); }}}Copy the code