Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

This paper gives an overview of

The official sink service provided by Flink may not meet our needs. At this time, we can develop a customized sink, and the text will be used together.

Full range of links

  1. “Flink sink Actual Combat: A Preliminary Study”
  2. Flink Sink Combat II: Kafka
  3. Flink’s Sink: Cassandra3
  4. “Flink sink Combat iv: Custom”

Inheritance relationships

  1. Before formal coding, it is necessary to figure out how to realize the sink capability. For the sink operations such as print, kafka and Cassandra that we have used in actual combat before, the inheritance relationship of core classes is shown in the figure below:

2. It can be seen that the key to realize sink capability is to implement RichFunction and SinkFunction interfaces. The former is used for resource control (such as open and close operations), while the latter is responsible for specific operations of Sink. Let’s see how the simplest PrintSinkFunction class implements the Invoke method of the SinkFunction interface:

@Override
public void invoke(IN record) {
  writer.write(record);
}
Copy the code
  1. Now the basic logic of sink has been clear, you can start coding actual combat;

Content and Version

User-defined sink is used to write data to MySQL. The version information involved is as follows:

  1. The JDK: 1.8.0 comes with _191
  2. Flink: 1.9.2
  3. Maven: 3.6.0
  4. CentOS Linux release 7.7.1908
  5. MySQL: 5.7.29
  6. IDEA: 2018.3.5 (Ultimate Edition)

Download the source code

If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…

The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol

This git project has multiple folders. The application of this chapter is in the FlinksinkDemo folder, as shown in the red box below:

Database preparation

Create flinkdemo and table student (flinkdemo and table student);

create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Copy the code

coding

  1. Use flinksinkDemo project created in Flink’s Sink Combat II: Kafka;
  2. Add mysql dependency to pom.xml:
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.11</version>
</dependency>
Copy the code
  1. Create entity class student.java that corresponds to the student table in the database:
package com.bolingcavalry.customize;

public class Student {
    private int id;
    private String name;
    private int age;

    public int getId(a) {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge(a) {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age; }}Copy the code
  1. Create a custom sink class mysqlSinkfunction. Java, which is the core of this article.
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> {

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // Prepare the database instance
        buildPreparedStatement();
    }

    @Override
    public void close(a) throws Exception {
        super.close();

        try{
            if(null! =preparedStatement) { preparedStatement.close(); preparedStatement =null; }}catch(Exception e) {
            e.printStackTrace();
        }

        try{
            if(null! =connection) { connection.close(); connection =null; }}catch(Exception e) { e.printStackTrace(); }}@Override
    public void invoke(Student value, Context context) throws Exception {
        preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();
    }

    ** ** Obtain a connection instance from the mysql database. ** Synchronize is not used because obtaining a database connection is a remote operation and takes an unknown amount of time@return* /
    private void buildPreparedStatement(a) {
        if(null==connection) {
            boolean hasLock = false;
            try {
                hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) {
                    Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("JDBC: mysql: / / 192.168.50.43:3306 / flinkdemo? useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC"."root"."123456");
                }

                if(null! =connection) { preparedStatement = connection.prepareStatement("insert into student (name, age) values (? ,?) "); }}catch (Exception e) {
                // Use with caution in production environment
                e.printStackTrace();
            } finally {
                if(hasLock) {
                    reentrantLock.unlock();
                }
            }
        }
    }
}
Copy the code
  1. This code is simple, but note that locks are used to control multithreaded synchronization when creating connections, and that drivers and URIs are written differently in older versions of mysql driver than in previous 5.x versions.
  2. Studentsink. Java to create a flink task that uses ArrayList to create a data set, and then addSink. To see the DAG, call the disableChaining method to disable the operator chain:
package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Parallelism is 1
        env.setParallelism(1);

        List<Student> list = new ArrayList<>();
        list.add(new Student("aaa".11));
        list.add(new Student("bbb".12));
        list.add(new Student("ccc".13));
        list.add(new Student("ddd".14));
        list.add(new Student("eee".15));
        list.add(new Student("fff".16));

        env.fromCollection(list)
            .addSink(new MySQLSinkFunction())
            .disableChaining();

        env.execute("sink demo : customize mysql obj"); }}Copy the code
  1. Submit the task on the Flink Web page and set the task class:

8. After the task is completed, the DAG chart shows that the number of tasks and records are in line with expectations:SQL > select * from database where data has been written:

So far, the actual combat of user-defined sink has been completed. I hope this article can give you some reference.

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…