background

In the recent project, data need to be transferred, so I simply learned some knowledge points related to Flink. Through a simple integration, I made a record and sort out the recent learning results, helping beginners to see the complete process of Flink streaming data processing. The design idea of Demo is that the application system sends data to Kafka periodically. Flink obtains the streaming data from Kafka and stores it in the MySQL database.

Because I am also a beginner of Flink when WRITING notes, it is inevitable that there will be incomplete or wrong places, I hope you can forgive me, if this article has the honor to help you in your work or study, I feel very honored here.Copy the code

First, environmental requirements

Kafka (2.13-2.7.0), kafka (3.5.5), JDK (1.8.61), mysql (8.0.15)

2. Zookeeper environment: The zooKeeper installation file is unpacked. If the zooKeeper installation file is not installed in the kafka installation file, the zooKeeper installation file will be unpacked. CFG file (if zoo_sample. CFG does not exist, rename the zoo_sample. CFG file as follows)

DataDir =D: software bigData zookeeper 3.5.5 dataDir=D: software bigData zookeeper 3.5.5 DataLogDir = D: \ software \ bigData \ zookeeper \ 2.6.2 \ data \ logCopy the code

Access the ZooKeeper installation directory on the CLI and run the following command to start the ZooKeeper service in Windows. The default service port is 2181

.\bin\zkServer.cmd
Copy the code

If no error information is displayed in the cli startup log, the startup is successful. Proceed with the following steps.

3. Kafka environment: Decompress the downloaded Kafka installation package to an appropriate location, go to the config directory, and modify the server.properties file

Dirs =./logs # Zookeeper address and port zookeeper.connect=localhost:2181Copy the code

The following command is used to start the Kafka service. If no error message is displayed on the console, the service is successfully started.

.\bin\windows\kafka-server-start.bat .\config\server.properties
Copy the code

Second, Demo development

Mysql > create table

CREATE TABLE `student`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `age` bigint(20) NULL DEFAULT NULL,
  `createDate` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL.PRIMARY KEY (`id`) USING BTREE
)
Copy the code

2. Develop Maven dependencies in your project

<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.10.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> < the groupId > org. Apache. Flink < / groupId > < artifactId > flink - streaming - java_2. 11 < / artifactId > <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> < artifactId > flink - clients_2. 11 < / artifactId > < version > ${flink. Version} < / version > < / dependency > < the dependency > < the groupId > org. Apache. Flink < / groupId > < artifactId > flink connector - kafka - 0.11 _2. 11 < / artifactId > <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> < artifactId > flink - jdbc_2. 11 < / artifactId > < version > ${flink. Version} < / version > < / dependency > < the dependency > < the groupId > org. Influxdb < / groupId > < artifactId > influxdb - Java < / artifactId > < version > 2.17 < / version > < / dependency > < the dependency > < groupId > mysql < / groupId > < artifactId > mysql connector - Java < / artifactId > < version > 8.0.15 < / version > </dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.22</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> < version > 1.18.12 < / version > < / dependency > < the dependency > < groupId > com. Google. Code. Gson < / groupId > <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> < artifactId > fastjson < / artifactId > < version > 1.2.71 < / version > < / dependency > < the dependency > Mons < groupId > org.apache.com < / groupId > < artifactId > Commons - collections4 < / artifactId > < version > 4.2 < / version > < / dependency > <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> < the dependency > < groupId > org. Slf4j < / groupId > < artifactId > slf4j - simple < / artifactId > < version > 1.7.25 < / version > <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> < version > 1.7.25 < / version > < scope > test < / scope > < / dependency >Copy the code

Create a database connection pool with druid

package com.nandy.influxdb.common;

import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

/ * * *@author nandy
 * @create2021/3/2 roar, * /
@Slf4j
public class ConnectionPoolUtils {

    private static final Properties properties;
    private static Connection conn;

    static {
        properties = new Properties();
        ClassLoader classLoader = ConnectionPoolUtils.class.getClassLoader();
        InputStream resourceAsStream = classLoader.getResourceAsStream("jdbc.properties");
        // Load the configuration file
        try {
            properties.load(resourceAsStream);
        } catch (IOException e) {
            log.error("Load jdbc properties exception."); }}private ConnectionPoolUtils(a) {}public static Connection getConnection(a) throws SQLException {

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            conn = dataSource.getConnection();
        } catch (SQLException e) {
            log.error("error occurred :" + e);
            conn = null;
        } catch (Exception e) {
            log.error("error occurred while creating connection pool!");
        }
        returnconn; }}Copy the code

jdbc.properties

driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://localhost:3306/flink_study? serverTimezone=UTC
username=root
password=xxx
filters=stat
initialSize=5
maxActive=50
maxWait=60000
timeBetweenEvictionRunsMillis=60000
minEvictableIdleTimeMillis=300000
validationQuery=SELECT 1
testWhileIdle=true
testOnBorrow=false
testOnReturn=false
poolPreparedStatements=false
maxPoolPreparedStatementPerConnectionSize=200
Copy the code

log4j.properties

log4j.rootLogger=info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
Copy the code

SQL > alter database persist Student

package com.nandy.models;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

/ * * *@author nandy
 * @create2021/3/2 18:54 * /
@Setter
@Getter
public class Student implements Serializable {

    private static final long serialVersionUID = -3247106837870523911L;

    private int id;

    private String name;

    private int age;

    private String createDate;
}
Copy the code

5. Sink object in Flink

package com.nandy.mysql;

import com.nandy.models.Student;
import com.nandy.mysql.utils.ConnectionPoolUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;

/ * * *@author nandy
 * @create 2021/3/2 17:13
 */
@Slf4j
public class Flink2JdbcWriter extends RichSinkFunction<List<Student>> {
    private static final long serialVersionUID = -5072869539213229634L;


    private transient Connection connection = null;
    private transient PreparedStatement ps = null;
    private volatile boolean isRunning = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        connection = ConnectionPoolUtils.getConnection();
        if (null! = connection) { ps = connection.prepareStatement("insert  into student (name, age, createDate) values (?, ?, ?);"); }}@Override
    public void invoke(List<Student> list, Context context) throws Exception {

        if (isRunning && null! = ps) {for (Student one : list) {
                ps.setString(1, one.getName());
                ps.setInt(2, one.getAge());
                ps.setString(3, one.getCreateDate());
                ps.addBatch();
            }
            int [] count = ps.executeBatch();
            log.info(Number of successful writes to Mysql:+ count.length); }}@Override
    public void close(a) throws Exception {
        try {
            super.close();
            if(connection ! =null) {
                connection.close();
            }
            if(ps ! =null) { ps.close(); }}catch (Exception e) {
            e.printStackTrace();
        }
        isRunning = false; }}Copy the code

6. Main function

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE  file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this File * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the  License. */

package com.nandy;

import com.alibaba.fastjson.JSON;
import com.nandy.models.Student;
import com.nandy.mysql.Flink2JdbcWriter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Properties;

/** * Skeleton for a Flink Streaming Job. * * 

For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the Flink Website. * *

To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * *

If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */

@Slf4j public class FlinkReadDbWriterDb { public static void main(String[] args) throws Exception { // Build the flow execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); / / kafka configuration Properties props = new Properties(); props.put("bootstrap.servers"."192.168.10.42:9092"); props.put("zookeeper.connect"."192.168.10.42:2181"); props.put("group.id"."metric-group"); props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset"."latest"); DataStreamSource<String> dataStreamSource = env.addSource( new FlinkKafkaConsumer010<String>( // This kafka topic needs to be consistent with the topic of the utility class above "student".new SimpleStringSchema(), props)) // Single thread printing, the console is not out of order, does not affect the result .setParallelism(1); // Read data from Kafka and convert it to a Person object DataStream<Student> dataStream = dataStreamSource.map(string -> JSON.parseObject(string, Student.class)); // Collect the total of 5 seconds dataStream.timeWindowAll(Time.seconds(5L)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() { @Override public void apply(TimeWindow timeWindow, Iterable<Student> iterable, Collector<List<Student>> collector) throws Exception { List<Student> students = Lists.newArrayList(iterable); if(CollectionUtils.isNotEmpty(students)){ log.info("Total number of messages received in 5 seconds:+ students.size()); collector.collect(students); }}//sink to database }).addSink(new Flink2JdbcWriter()); env.execute("Flink Streaming Java API Skeleton"); }}Copy the code

Iii. Test and results

Create a test class that sends data to a topic created by Kafka

package com.nandy.kafka;

import com.alibaba.fastjson.JSON;
import com.nandy.models.Student;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/ * * *@author nandy
 * @create2021/3/4 will * /
@Slf4j
public class KafkaWriter {
    // A list of local Kafka machines
    public static final String BROKER_LIST = "192.168.10.42:9092";
    / / kafka's topic
    public static final String TOPIC_PERSON = "student";
    // Key is serialized as a string
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    // The serialization of value
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

    public static void writeToKafka(a) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            // Create a Person object with a random number after name HQS
            int randomInt = RandomUtils.nextInt(1.100000);
            Student student = new Student();
            student.setName("nandy" + randomInt);
            student.setAge(randomInt);
            student.setCreateDate(LocalDateTime.now().toString());
            // Convert to JSON
            String personJson = JSON.toJSONString(student);

            // Wrap kafka to send records
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_PERSON, null.null, personJson);
            // Send to cache
            producer.send(record);
            log.info("Send data to Kafka :" + personJson);
            // Send it immediatelyproducer.flush(); }}public static void main(String[] args) {
        int count = 0;
        while (count < 20) {
            try {
                // Write data every three seconds
                TimeUnit.SECONDS.sleep(3);
                writeToKafka();
                count++;
            } catch(Exception e) { e.printStackTrace(); }}}}Copy the code

Select * from student