Components and versions

The components used in this article include the following, which refer to the official case. The version can be referred to Github and the POM file inside. This article assumes that the following environments are installed.

  • The JDK (1.8)
  • MySQL (version 5.6)
  • Hadoop (2.7.2)
  • Hive (version 2.4)
  • The Spark (version against 2.4.1)
  • Kafka (version 0.11)
  • Griffin (version 0.6.0)
  • Zookeeper (version 3.4.1 track)

Here is a detailed configuration process and possible bugs.

Kafka data generation script

Since this is a test case, we will write a script to generate data and write the data to kafka source. The real scenario would be to write data to Kafka (flume or some other tool) continuously. For the data script and template, please refer to the official demo data

gen-data.sh

#! /bin/bash

#current time
cur_time=`date +%Y-%m-%d_%H:%M:%S`
sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp

#create data
for row in 1 2 3 4 5 6 7 8 9 10
do
  sed -n "${row}p" < /opt/module/data/source.tp > sline
  cnt=`shuf -i12 - -n1`
  clr="red"
  if [ $cnt= =2 ]; then clr="yellow"; fi
  sed s/COLOR/$clr/ sline >> /opt/module/data/source.data
done
rm sline

rm source.tp

#import data
kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.data

rm source.data

echo "insert data at ${cur_time}"

Copy the code

streaming-data.sh

#! /bin/bash

#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target

#every minute
set +e
while true
do
  /opt/module/data/gen-data.sh
  sleep 90
done
set -e

Copy the code

source.temp

{"id": 1."name": "Apple"."color": "COLOR"."time": "TIME"}
{"id": 2."name": "Banana"."color": "COLOR"."time": "TIME"}
{"id": 3."name": "Cherry"."color": "COLOR"."time": "TIME"}
{"id": 4."name": "Durian"."color": "COLOR"."time": "TIME"}
{"id": 5."name": "Lichee"."color": "COLOR"."time": "TIME"}
{"id": 6."name": "Peach"."color": "COLOR"."time": "TIME"}
{"id": 7."name": "Papaya"."color": "COLOR"."time": "TIME"}
{"id": 8."name": "Lemon"."color": "COLOR"."time": "TIME"}
{"id": 9."name": "Mango"."color": "COLOR"."time": "TIME"}
{"id": 10."name": "Pitaya"."color": "COLOR"."time": "TIME"}
Copy the code

Three. Flink flow processing

Flink streaming data is divided into three parts: kafka reading, business processing, kafka 1 writing. Start by explaining the dependencies that my POM.xml introduced

<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0. 0</modelVersion>

    <groupId>com.xxxx</groupId>
    <artifactId>kafka_Flink_kafka_Test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7. 0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1. 0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> <relocations> <relocation> <pattern>org.codehaus.plexus.util</pattern> <shadedPattern>org.shaded.plexus.util</shadedPattern> <excludes> <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude> <exclude>org.codehaus.plexus.util.xml.pull.*</exclude> </excludes> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> </build> <dependencies> <! --<dependency>--> <! --<groupId>org.apache.flink</groupId>--> <! --<artifactId>flink-table_210.</artifactId>--> <! --<version>1.32.</version>--> <! --</dependency>--> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20090211</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.62.</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.101.</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_211.</artifactId>
            <version>1.101.</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_211.</artifactId>
            <version>1.101.</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_211.</artifactId>
            <version>1.101.</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_211.</artifactId>
            <version>1.101.</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10 _211.</artifactId>
            <version>1.101.</version>
        </dependency>
    </dependencies>

</project>
Copy the code

2. Write a bean class template to receive JSON data

import java.util.Date;

public class Student{
    private int id;
    private String name;
    private String color;
    private Date time;

    public Student(a){}

    public Student(int id, String name, String color, Date time) {
        this.id = id;
        this.name = name;
        this.color = color;
        this.time = time;
    }

    public int getId(a) {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor(a) {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Date getTime(a) {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString(a) {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\' ' +
                ", color='" + color + '\' ' +
                ", time='" + time + '\' ' +
                '} '; }}Copy the code

KafkaUtil configuration information can be written to the kafkaUtil utility class

// Create Flink execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Be sure to set the startup checkpoint!!
        //env.enableCheckpointing(5000);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        / / Kafka parameters
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers"."hadoop101:9092");
        properties.setProperty("group.id"."consumer-group");
        properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset"."latest");
        String inputTopic = "source";
        String outputTopic = "target";

        // Source
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);
Copy the code
  1. For flink business processing, due to different businesses, I simply demo the following and modify the data with a probability of 20% to make it abnormal data for detection, in order to simulate the data quality problems that may occur due to incorrect data processing in the business. Here to mention in particular, this case is to assume that flink business processing delay is ignored, the real scene may be due to flink processing delay caused by target end mistaken for data loss, this part I am still studying his source code, the future update, have an understanding of the god, please also give guidance.
// Use Flink operator to process data simply
        // Transformations
        // Use the Flink operator to operate on the text in the input stream
        // Cut words by space, count, partition, set time window, aggregate
        //{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
        DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                returnhandleData(value); }});Copy the code
public static String handleData(String line){
        try {
                if(line! =null&& !line.equals("")){
                    Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
                    JsonReader reader = new JsonReader(new StringReader(line));
                    Student student = gson.fromJson(reader, Student.class);
                    int rand = ra.nextInt(10) + 1;
                    if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));
                    return gson.toJson(student);
                }
                else return "";
        }catch (Exception e){
            return ""; }}Copy the code

Gson was created this way because of a few bugs, as you can see in this blog post

  1. Kafka, FlinkKafkaProducer010 The constructor that we choose is (brokerList, topicId, serializationSchema)
//Sink
        outMap.addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092"."target".new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();
Copy the code

Configure and start Apache Griffin

The streaming mode configuration for Griffin is dq.json and env.json. Dq. json is explained in detail in this article

{
  "name": "streaming_accu"."process.type": "streaming"."data.sources": [{"name": "src"."baseline": true."connector": 
        {
          "type": "kafka"."version": "0.10"."config": {
            "kafka.config": {
              "bootstrap.servers": "hadoop101:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
            },
            "topics": "source_1"."key.type": "java.lang.String"."value.type": "java.lang.String"
          },
          "pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}},"checkpoint": {
        "type": "json"."file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/source"."info.path": "source_1"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-5m"."0"]."updatable": true}}, {"name": "tgt"."connector": 
        {
          "type": "kafka"."version": "0.10"."config": {
            "kafka.config": {
              "bootstrap.servers": "hadoop101:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
            },
            "topics": "target_1"."key.type": "java.lang.String"."value.type": "java.lang.String"
          },
          "pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}},"checkpoint": {
        "type": "json"."file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/target"."info.path": "target_1"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-1m"."0"]}}],"evaluate.rule": {
    "rules": [{"dsl.type": "griffin-dsl"."dq.type": "accuracy"."out.dataframe.name": "accu"."rule": "src.login_id = tgt.login_id AND src.bussiness_id = tgt.bussiness_id AND src.event_id = tgt.event_id"."details": {
          "source": "src"."target": "tgt"."miss": "miss_count"."total": "total_count"."matched": "matched_count"
        },
        "out":[
          {
            "type":"metric"."name": "accu"
          },
          {
            "type":"record"."name": "missRecords"}]}]},"sinks": ["HdfsSink"]}Copy the code

env.json

{
  "spark": {
    "log.level": "WARN"."checkpoint.dir": "hdfs://hadoop101:9000/griffin/checkpoint"."batch.interval": "20s"."process.interval": "1m"."init.clear": true."config": {
      "spark.default.parallelism": 4."spark.task.maxFailures": 5."spark.streaming.kafkaMaxRatePerPartition": 1000."spark.streaming.concurrentJobs": 4."spark.yarn.maxAppAttempts": 5."spark.yarn.am.attemptFailuresValidityInterval": "1h"."spark.yarn.max.executor.failures": 120."spark.yarn.executor.failuresValidityInterval": "1h"."spark.hadoop.fs.hdfs.impl.disable.cache": true}},"sinks": [{"name":"ConsoleSink"."type": "console"
    },
    {
      "name":"HdfsSink"."type": "hdfs"."config": {
        "path": "hdfs://hadoop101:9000/griffin/persist"}}, {"name":"ElasticsearchSink"."type": "elasticsearch"."config": {
        "method": "post"."api": "http://hadoop101:9200/griffin/accuracy"}}]."griffin.checkpoint": [{"type": "zk"."config": {
        "hosts": "hadoop101:2181"."namespace": "griffin/infocache"."lock.path": "lock"."mode": "persist"."init.clear": true."close.clear": false}}}]Copy the code

Finally, submit the project to Spark to run and check the data

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default\ -driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
Copy the code

Global code

Create a Maven project locally. Since this is a simple test project, build it yourself. I only wrote two classes to test student.class

import java.util.Date;

public class Student{
    private int id;
    private String name;
    private String color;
    private Date time;

    public Student(a){}

    public Student(int id, String name, String color, Date time) {
        this.id = id;
        this.name = name;
        this.color = color;
        this.time = time;
    }

    public int getId(a) {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor(a) {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Date getTime(a) {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString(a) {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\' ' +
                ", color='" + color + '\' ' +
                ", time='" + time + '\' ' +
                '} '; }}Copy the code

flinkProcess.class


import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.io.StringReader;
import java.util.Properties;
import java.util.Random;

public class flinkProcess {
    public static Random ra = new Random();
    public static void main(String[] args) throws Exception {
        // Create Flink execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Be sure to set the startup checkpoint!!
        //env.enableCheckpointing(5000);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        / / Kafka parameters
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers"."hadoop101:9092");
        properties.setProperty("group.id"."consumer-group");
        properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset"."latest");
        String inputTopic = "source";
        String outputTopic = "target";

        // Source
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        // Use Flink operator to process data simply
        // Transformations
        // Use the Flink operator to operate on the text in the input stream
        // Cut words by space, count, partition, set time window, aggregate
        //{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
        DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                returnhandleData(value); }});//Sink
        outMap.addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092"."target".new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();
    }

    public static String handleData(String line){
        try {
                if(line! =null&& !line.equals("")){
                    Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
                    JsonReader reader = new JsonReader(new StringReader(line));
                    Student student = gson.fromJson(reader, Student.class);
                    int rand = ra.nextInt(10) + 1;
                    if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));
                    return gson.toJson(student);
                }
                else return "";
        }catch (Exception e){
            return ""; }}}Copy the code

If kafka dataDir and ZooKeeper zNode data are not in the same format as kafka dataDir, the program will continue to report errors. You can refer to this article to delete the corresponding kafka dataDir and ZooKeeper ZNode data.

Reference article: 1.blog.csdn.net/weixin_4193… 2. www.jianshu.com/p/c94317de9… 3. Juejin. Cn/post / 684490… 4. www.jianshu.com/p/da30e5d6a… 5. blog.csdn.net/qq_35731570… 6. blog.csdn.net/gaofengyan/… 7. www.cnblogs.com/garfieldcgf… 8. my.oschina.net/u/3247419/b…