Components and versions
The components used in this article include the following, which refer to the official case. The version can be referred to Github and the POM file inside. This article assumes that the following environments are installed.
- The JDK (1.8)
- MySQL (version 5.6)
- Hadoop (2.7.2)
- Hive (version 2.4)
- The Spark (version against 2.4.1)
- Kafka (version 0.11)
- Griffin (version 0.6.0)
- Zookeeper (version 3.4.1 track)
Here is a detailed configuration process and possible bugs.
Kafka data generation script
Since this is a test case, we will write a script to generate data and write the data to kafka source. The real scenario would be to write data to Kafka (flume or some other tool) continuously. For the data script and template, please refer to the official demo data
gen-data.sh
#! /bin/bash
#current time
cur_time=`date +%Y-%m-%d_%H:%M:%S`
sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp
#create data
for row in 1 2 3 4 5 6 7 8 9 10
do
sed -n "${row}p" < /opt/module/data/source.tp > sline
cnt=`shuf -i12 - -n1`
clr="red"
if [ $cnt= =2 ]; then clr="yellow"; fi
sed s/COLOR/$clr/ sline >> /opt/module/data/source.data
done
rm sline
rm source.tp
#import data
kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.data
rm source.data
echo "insert data at ${cur_time}"
Copy the code
streaming-data.sh
#! /bin/bash
#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target
#every minute
set +e
while true
do
/opt/module/data/gen-data.sh
sleep 90
done
set -e
Copy the code
source.temp
{"id": 1."name": "Apple"."color": "COLOR"."time": "TIME"}
{"id": 2."name": "Banana"."color": "COLOR"."time": "TIME"}
{"id": 3."name": "Cherry"."color": "COLOR"."time": "TIME"}
{"id": 4."name": "Durian"."color": "COLOR"."time": "TIME"}
{"id": 5."name": "Lichee"."color": "COLOR"."time": "TIME"}
{"id": 6."name": "Peach"."color": "COLOR"."time": "TIME"}
{"id": 7."name": "Papaya"."color": "COLOR"."time": "TIME"}
{"id": 8."name": "Lemon"."color": "COLOR"."time": "TIME"}
{"id": 9."name": "Mango"."color": "COLOR"."time": "TIME"}
{"id": 10."name": "Pitaya"."color": "COLOR"."time": "TIME"}
Copy the code
Three. Flink flow processing
Flink streaming data is divided into three parts: kafka reading, business processing, kafka 1 writing. Start by explaining the dependencies that my POM.xml introduced
<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0. 0</modelVersion>
<groupId>com.xxxx</groupId>
<artifactId>kafka_Flink_kafka_Test</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7. 0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1. 0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> <relocations> <relocation> <pattern>org.codehaus.plexus.util</pattern> <shadedPattern>org.shaded.plexus.util</shadedPattern> <excludes> <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude> <exclude>org.codehaus.plexus.util.xml.pull.*</exclude> </excludes> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> </build> <dependencies> <! --<dependency>--> <! --<groupId>org.apache.flink</groupId>--> <! --<artifactId>flink-table_210.</artifactId>--> <! --<version>1.32.</version>--> <! --</dependency>--> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20090211</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.62.</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.101.</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_211.</artifactId>
<version>1.101.</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_211.</artifactId>
<version>1.101.</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_211.</artifactId>
<version>1.101.</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_211.</artifactId>
<version>1.101.</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10 _211.</artifactId>
<version>1.101.</version>
</dependency>
</dependencies>
</project>
Copy the code
2. Write a bean class template to receive JSON data
import java.util.Date;
public class Student{
private int id;
private String name;
private String color;
private Date time;
public Student(a){}
public Student(int id, String name, String color, Date time) {
this.id = id;
this.name = name;
this.color = color;
this.time = time;
}
public int getId(a) {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName(a) {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor(a) {
return color;
}
public void setColor(String color) {
this.color = color;
}
public Date getTime(a) {
return time;
}
public void setTime(Date time) {
this.time = time;
}
@Override
public String toString(a) {
return "Student{" +
"id=" + id +
", name='" + name + '\' ' +
", color='" + color + '\' ' +
", time='" + time + '\' ' +
'} '; }}Copy the code
KafkaUtil configuration information can be written to the kafkaUtil utility class
// Create Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Be sure to set the startup checkpoint!!
//env.enableCheckpointing(5000);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
/ / Kafka parameters
Properties properties = new Properties();
properties.setProperty("bootstrap.servers"."hadoop101:9092");
properties.setProperty("group.id"."consumer-group");
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset"."latest");
String inputTopic = "source";
String outputTopic = "target";
// Source
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
Copy the code
- For flink business processing, due to different businesses, I simply demo the following and modify the data with a probability of 20% to make it abnormal data for detection, in order to simulate the data quality problems that may occur due to incorrect data processing in the business. Here to mention in particular, this case is to assume that flink business processing delay is ignored, the real scene may be due to flink processing delay caused by target end mistaken for data loss, this part I am still studying his source code, the future update, have an understanding of the god, please also give guidance.
// Use Flink operator to process data simply
// Transformations
// Use the Flink operator to operate on the text in the input stream
// Cut words by space, count, partition, set time window, aggregate
//{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
returnhandleData(value); }});Copy the code
public static String handleData(String line){
try {
if(line! =null&& !line.equals("")){
Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
JsonReader reader = new JsonReader(new StringReader(line));
Student student = gson.fromJson(reader, Student.class);
int rand = ra.nextInt(10) + 1;
if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));
return gson.toJson(student);
}
else return "";
}catch (Exception e){
return ""; }}Copy the code
Gson was created this way because of a few bugs, as you can see in this blog post
- Kafka, FlinkKafkaProducer010 The constructor that we choose is (brokerList, topicId, serializationSchema)
//Sink
outMap.addSink(new FlinkKafkaProducer010<String>(
"hadoop101:9092"."target".new SimpleStringSchema()
));
outMap.print();
env.execute();
Copy the code
Configure and start Apache Griffin
The streaming mode configuration for Griffin is dq.json and env.json. Dq. json is explained in detail in this article
{
"name": "streaming_accu"."process.type": "streaming"."data.sources": [{"name": "src"."baseline": true."connector":
{
"type": "kafka"."version": "0.10"."config": {
"kafka.config": {
"bootstrap.servers": "hadoop101:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
},
"topics": "source_1"."key.type": "java.lang.String"."value.type": "java.lang.String"
},
"pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}},"checkpoint": {
"type": "json"."file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/source"."info.path": "source_1"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-5m"."0"]."updatable": true}}, {"name": "tgt"."connector":
{
"type": "kafka"."version": "0.10"."config": {
"kafka.config": {
"bootstrap.servers": "hadoop101:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
},
"topics": "target_1"."key.type": "java.lang.String"."value.type": "java.lang.String"
},
"pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}},"checkpoint": {
"type": "json"."file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/target"."info.path": "target_1"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-1m"."0"]}}],"evaluate.rule": {
"rules": [{"dsl.type": "griffin-dsl"."dq.type": "accuracy"."out.dataframe.name": "accu"."rule": "src.login_id = tgt.login_id AND src.bussiness_id = tgt.bussiness_id AND src.event_id = tgt.event_id"."details": {
"source": "src"."target": "tgt"."miss": "miss_count"."total": "total_count"."matched": "matched_count"
},
"out":[
{
"type":"metric"."name": "accu"
},
{
"type":"record"."name": "missRecords"}]}]},"sinks": ["HdfsSink"]}Copy the code
env.json
{
"spark": {
"log.level": "WARN"."checkpoint.dir": "hdfs://hadoop101:9000/griffin/checkpoint"."batch.interval": "20s"."process.interval": "1m"."init.clear": true."config": {
"spark.default.parallelism": 4."spark.task.maxFailures": 5."spark.streaming.kafkaMaxRatePerPartition": 1000."spark.streaming.concurrentJobs": 4."spark.yarn.maxAppAttempts": 5."spark.yarn.am.attemptFailuresValidityInterval": "1h"."spark.yarn.max.executor.failures": 120."spark.yarn.executor.failuresValidityInterval": "1h"."spark.hadoop.fs.hdfs.impl.disable.cache": true}},"sinks": [{"name":"ConsoleSink"."type": "console"
},
{
"name":"HdfsSink"."type": "hdfs"."config": {
"path": "hdfs://hadoop101:9000/griffin/persist"}}, {"name":"ElasticsearchSink"."type": "elasticsearch"."config": {
"method": "post"."api": "http://hadoop101:9200/griffin/accuracy"}}]."griffin.checkpoint": [{"type": "zk"."config": {
"hosts": "hadoop101:2181"."namespace": "griffin/infocache"."lock.path": "lock"."mode": "persist"."init.clear": true."close.clear": false}}}]Copy the code
Finally, submit the project to Spark to run and check the data
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default\ -driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
Copy the code
Global code
Create a Maven project locally. Since this is a simple test project, build it yourself. I only wrote two classes to test student.class
import java.util.Date;
public class Student{
private int id;
private String name;
private String color;
private Date time;
public Student(a){}
public Student(int id, String name, String color, Date time) {
this.id = id;
this.name = name;
this.color = color;
this.time = time;
}
public int getId(a) {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName(a) {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor(a) {
return color;
}
public void setColor(String color) {
this.color = color;
}
public Date getTime(a) {
return time;
}
public void setTime(Date time) {
this.time = time;
}
@Override
public String toString(a) {
return "Student{" +
"id=" + id +
", name='" + name + '\' ' +
", color='" + color + '\' ' +
", time='" + time + '\' ' +
'} '; }}Copy the code
flinkProcess.class
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.io.StringReader;
import java.util.Properties;
import java.util.Random;
public class flinkProcess {
public static Random ra = new Random();
public static void main(String[] args) throws Exception {
// Create Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Be sure to set the startup checkpoint!!
//env.enableCheckpointing(5000);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
/ / Kafka parameters
Properties properties = new Properties();
properties.setProperty("bootstrap.servers"."hadoop101:9092");
properties.setProperty("group.id"."consumer-group");
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset"."latest");
String inputTopic = "source";
String outputTopic = "target";
// Source
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
// Use Flink operator to process data simply
// Transformations
// Use the Flink operator to operate on the text in the input stream
// Cut words by space, count, partition, set time window, aggregate
//{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
returnhandleData(value); }});//Sink
outMap.addSink(new FlinkKafkaProducer010<String>(
"hadoop101:9092"."target".new SimpleStringSchema()
));
outMap.print();
env.execute();
}
public static String handleData(String line){
try {
if(line! =null&& !line.equals("")){
Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
JsonReader reader = new JsonReader(new StringReader(line));
Student student = gson.fromJson(reader, Student.class);
int rand = ra.nextInt(10) + 1;
if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));
return gson.toJson(student);
}
else return "";
}catch (Exception e){
return ""; }}}Copy the code
If kafka dataDir and ZooKeeper zNode data are not in the same format as kafka dataDir, the program will continue to report errors. You can refer to this article to delete the corresponding kafka dataDir and ZooKeeper ZNode data.
Reference article: 1.blog.csdn.net/weixin_4193… 2. www.jianshu.com/p/c94317de9… 3. Juejin. Cn/post / 684490… 4. www.jianshu.com/p/da30e5d6a… 5. blog.csdn.net/qq_35731570… 6. blog.csdn.net/gaofengyan/… 7. www.cnblogs.com/garfieldcgf… 8. my.oschina.net/u/3247419/b…