1. In this paper,

The log data we collected previously is stored in Kafka as the ODS layer of log data. The log data read from Kafka’s ODS layer is divided into three types: page logs, startup logs, and exposure logs. Although these three types of data are all user behavior data, they have completely different data structures, so they need to be split and processed. Write the split logs back to the different Kafka topics as the log DWD layer.

Page logs are output to the main stream, startup logs to the startup side output stream, and exposure logs to the exposure side output stream

2. Identify new and existing users

The client service has the identification of new and old users, but it is not accurate enough. Real-time calculation is needed to confirm the identification again (no service operations are involved, but only status confirmation is performed).

Data split is realized by side output stream

According to the log data content, the log data is divided into three types: page logs, startup logs, and exposure logs. Push data from different streams downstream to different Kafka topics

3. Code implementation

Create flink task baselogtask.java under package app,

Use Flink to consume Kafka data, and then record checkpoint consumption to HDFS. Manually create a path and grant permissions

Checkpoint is optional and can be turned off during testing.

package com.zhangbao.gmall.realtime.app;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/ * * *@author: zhangbao
 * @date: 2021/6/18 "*@desc: * * /
public class BaseLogTask {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set the parallelism, that is, the number of kafka partitions
        env.setParallelism(4);
        // Add checkpoint every 5 seconds
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
        // Specify which user reads HDFS files
        System.setProperty("HADOOP_USER_NAME"."zhangbao");
        // Add data source
        String topic = "ods_base_log";
        String group = "base_log_app_group";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
        DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
        // Convert the format
        SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String s) throws Exception {
                returnJSONObject.parseObject(s); }}); jsonDs.print("json >>> --- ");

        try {
            / / execution
            env.execute();
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

MyKafkaUtil. Java tools

package com.zhangbao.gmall.realtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/ * * *@author: zhangbao
 * @date: 2021/6/18 suffering justly *@desc: * * /
public class MyKafkaUtil {
    private static String kafka_host = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
    /** * Kafka consumer */
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String group){
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_host);
        return new FlinkKafkaConsumer<String>(topic, newSimpleStringSchema(),props); }}Copy the code

4. State repair of new and old visitors

Identify new and old customer rules

Identification of new and old visitor, front will be recorded on the new and old customer state, may not, here to confirm again, save the mid day state conditions (as of the date of the first visit to save state), such as the equipment in a log, date and log date was obtained from the state of comparison, if the status is not empty, and state the date and the current date is not equal, that is the old visitor, If the is_new flag is 1, its state is repaired.

import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.text.SimpleDateFormat;
import java.util.Date;

/ * * *@author: zhangbao
 * @date: 2021/6/18 "*@desc: * * /
public class BaseLogTask {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set the parallelism, that is, the number of kafka partitions
        env.setParallelism(4);
        // Add checkpoint every 5 seconds
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
        // Specify which user reads HDFS files
        System.setProperty("HADOOP_USER_NAME"."zhangbao");
        
        // Add data source to kafka
        String topic = "ods_base_log";
        String group = "base_log_app_group";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
        DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
        // Convert the format
        SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String s) throws Exception {
                returnJSONObject.parseObject(s); }}); jsonDs.print("json >>> --- ");
        /** * if the state is not empty, the state is not empty. If the state is not empty, the state is not empty. If the state is not empty, the state is not empty. And the status date is not equal to the current date, indicating the old visitor, if the is_new flag is 1, the state is fixed */
        // Group logs by ID
        KeyedStream<JSONObject, String> midKeyedDs = jsonDs.keyBy(data -> data.getJSONObject("common").getString("mid"));
        // New and old visitor state repair, state is divided into operator state and keyed state, we record a certain device state, use keyed state is more appropriate
        SingleOutputStreamOperator<JSONObject> midWithNewFlagDs = midKeyedDs.map(new RichMapFunction<JSONObject, JSONObject>() {
            // Define mid state
            private ValueState<String> firstVisitDateState;
            // Define date formatting
            private SimpleDateFormat sdf;
            // Initialize method
            @Override
            public void open(Configuration parameters) throws Exception {
                firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
                sdf = new SimpleDateFormat("yyyyMMdd");
            }
            @Override
            public JSONObject map(JSONObject jsonObject) throws Exception {
                // Get the current mid state
                String is_new = jsonObject.getJSONObject("common").getString("is_new");
                // Get the current log timestamp
                Long ts = jsonObject.getLong("ts");
                if ("1".equals(is_new)) {
                    // Visitor date status
                    String stateDate = firstVisitDateState.value();
                    String nowDate = sdf.format(new Date());
                    if(stateDate ! =null&& stateDate.length() ! =0 && !stateDate.equals(nowDate)) {
                        / / is old
                        is_new = "0";
                        jsonObject.getJSONObject("common").put("is_new", is_new);
                    } else {
                        / / new visitorsfirstVisitDateState.update(nowDate); }}returnjsonObject; }}); midWithNewFlagDs.print();try {
            / / execution
            env.execute();
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

5. Use side output stream to achieve data splitting

After the above new and old customer repair, the log data is divided into three categories

Start log tag definition: OutputTag

startTag = new OutputTag

(“start”){};

OutputTag

displayTag = new OutputTag

(“display”){};

Page logs are output to the main stream, startup logs to the startup output stream, and exposure logs to the exposure log output stream.

The data is split and sent to Kafka

  • Dwd_start_log: startup log
  • Dwd_display_log: exposure log
  • Dwd_page_log: indicates the page log
package com.zhangbao.gmall.realtime.app;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.Date;
/ * * *@author: zhangbao
 * @date: 2021/6/18 "*@desc: * * /
public class BaseLogTask {
    private static final String TOPIC_START = "dwd_start_log";
    private static final String TOPIC_DISPLAY = "dwd_display_log";
    private static final String TOPIC_PAGE = "dwd_page_log";
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set the parallelism, that is, the number of kafka partitions
        env.setParallelism(4);
        // Add checkpoint every 5 seconds
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
        // Specify which user reads HDFS files
        System.setProperty("HADOOP_USER_NAME"."zhangbao");

        // Add data source to kafka
        String topic = "ods_base_log";
        String group = "base_log_app_group";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
        DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
        // Convert the format
        SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String s) throws Exception {
                returnJSONObject.parseObject(s); }}); jsonDs.print("json >>> --- ");
        /** * if the state is not empty, the state is not empty. If the state is not empty, the state is not empty. If the state is not empty, the state is not empty. And the status date is not equal to the current date, indicating the old visitor, if the is_new flag is 1, the state is fixed */
        // Group logs by ID
        KeyedStream<JSONObject, String> midKeyedDs = jsonDs.keyBy(data -> data.getJSONObject("common").getString("mid"));
        // New and old visitor state repair, state is divided into operator state and keyed state, we record a certain device state, use keyed state is more appropriate
        SingleOutputStreamOperator<JSONObject> midWithNewFlagDs = midKeyedDs.map(new RichMapFunction<JSONObject, JSONObject>() {
            // Define mid state
            private ValueState<String> firstVisitDateState;
            // Define date formatting
            private SimpleDateFormat sdf;
            // Initialize method
            @Override
            public void open(Configuration parameters) throws Exception {
                firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
                sdf = new SimpleDateFormat("yyyyMMdd");
            }
            @Override
            public JSONObject map(JSONObject jsonObject) throws Exception {
                // Get the current mid state
                String is_new = jsonObject.getJSONObject("common").getString("is_new");
                // Get the current log timestamp
                Long ts = jsonObject.getLong("ts");
                if ("1".equals(is_new)) {
                    // Visitor date status
                    String stateDate = firstVisitDateState.value();
                    String nowDate = sdf.format(new Date());
                    if(stateDate ! =null&& stateDate.length() ! =0 && !stateDate.equals(nowDate)) {
                        / / is old
                        is_new = "0";
                        jsonObject.getJSONObject("common").put("is_new", is_new);
                    } else {
                        / / new visitorsfirstVisitDateState.update(nowDate); }}returnjsonObject; }});// midWithNewFlagDs.print();

        /** * According to the log data content, the log data can be divided into three types: page log, startup log and exposure log. Page log * output to main stream, start log output to start side output stream, exposure log output to exposure log side output stream * side output stream: 1 receive late data, 2 split */
        // Define the start side output stream label with parentheses to generate the corresponding type
        OutputTag<String> startTag = new OutputTag<String>("start") {};// Define the exposure side output stream label
        OutputTag<String> displayTag = new OutputTag<String>("display") {}; SingleOutputStreamOperator<String> pageDs = midWithNewFlagDs.process(new ProcessFunction<JSONObject, String>() {
                    @Override
                    public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
                        String dataStr = jsonObject.toString();
                        JSONObject startJson = jsonObject.getJSONObject("start");
                        // Determine whether to start logging
                        if(startJson ! =null && startJson.size() > 0) {
                            context.output(startTag, dataStr);
                        } else {
                            // Determine whether to expose the log
                            JSONArray jsonArray = jsonObject.getJSONArray("displays");
                            if(jsonArray ! =null && jsonArray.size() > 0) {
                                // Add pageId to each exposure
                                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                                // Iterate over the output exposure log
                                for (int i = 0; i < jsonArray.size(); i++) {
                                    JSONObject disPlayObj = jsonArray.getJSONObject(i);
                                    disPlayObj.put("page_id", pageId); context.output(displayTag, disPlayObj.toString()); }}else {
                                // If not the exposure log, then the page log, output to the mainstreamcollector.collect(dataStr); }}}});// Get the side output stream
        DataStream<String> startDs = pageDs.getSideOutput(startTag);
        DataStream<String> disPlayDs = pageDs.getSideOutput(displayTag);
        // Print the output
        startDs.print("start>>>");
        disPlayDs.print("display>>>");
        pageDs.print("page>>>");

        /** * Sends log data for different streams to the specified kafka topic */
        startDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_START));
        disPlayDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_DISPLAY));
        pageDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_PAGE));

        try {
            / / execution
            env.execute();
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code