First, the Data Sinks

In Data processing through Flink, Data flows in through Data Source and then through a series of Transformations, the calculation results can be output through Sink. Flink Data Sinks are used to define the final output position of Data flow. Flink provides several relatively simple Sink apis for daily development, as follows:

1.1 writeAsText

WriteAsText is used to write computation results as text in parallel to a specified folder. In addition to the path argument being mandatory, this method can also define the output mode by specifying a second argument, which has the following two optional values:

  • Writemode. NO_OVERWRITE: Write a file only when no file exists in the specified path.
  • Writemode. OVERWRITE: Writes out files regardless of whether the specified path exists. If the file already exists, it is overwritten.

The following is an example:

 streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);
Copy the code

The above is written to multiple files in parallel. If you want to write the output to a single file, you need to set the parallelism to 1:

streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
Copy the code

1.2 writeAsCsv

WriteAsCsv is used to write computed results to a specified directory in CSV file format. In addition to the path parameter being mandatory, this method also supports three additional parameters: incoming output mode, line delimiter, and field delimiter. Its method definition is as follows:

writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) 
Copy the code

1.3 print \ printToErr

Print \ printToErr is the most common way in testing to print the results to the console as either a standard output stream or an error output stream.

1.4 writeUsingOutputFormat

Use custom output format to calculate the result to write, the above introduction of writeAsText and writeAsCsv its underlying call is this method, the source code is as follows:

public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
    TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
    tof.setWriteMode(writeMode);
    return writeUsingOutputFormat(tof);
}
Copy the code

1.5 writeToSocket

The following is an example of the writeToSocket command used to write calculation results into the Socket in a specified format:

StreamSource. WriteToSocket (" 192.168.0.226 ", 9999, new SimpleStringSchema ());Copy the code

Second, the Streaming Connectors

In addition to the above API, Flink also has a series of built-in Connectors to input calculation results to common storage systems or message-oriented middleware, as follows:

  • Apache Kafka (support source and sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Google PubSub (source/sink)

In addition to the built-in connectors, you can also extend Flink with Apache Bahir’s connectors. Apache Bahir is designed to provide functional extensions for distributed data analysis systems such as Spark and Flink. Currently, it supports the following connectors related to Flink Sink:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)

Then Kafka Sink is integrated on the basis of Kafka Source integration introduced in the chapter of Data Sources. The specific steps are as follows.

3. Integrate Kafka Sink

3.1 addSink

Flink provides the addSink method to call a user-defined Sink or a third-party connector. To write the calculation results to Kafka, use this method to call Kafka’s FlinkKafkaProducer. The code is as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1. Specify Kafka configuration properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers"."192.168.200.0:9092");

// 2. Receive data on Kafka
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic".new SimpleStringSchema(), properties));

// 3. Define the conversion of calculation results to Kafka ProducerRecord
KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
    @Override
    public ProducerRecord<byte[].byte[]> serialize(String element, @Nullable Long timestamp) {
        return new ProducerRecord<>("flink-stream-out-topic", element.getBytes()); }};// 4. Define the Flink Kafka producer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
                                                                    kafkaSerializationSchema,
                                                                    properties,
                                               FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. Write the input element *2 to Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");
Copy the code

3.2 Creating an Output theme

Create a theme for output tests:

bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-out-topic

#View all topics
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
Copy the code

3.3 Start consumers

Start a Kafka consumer to view the output of the Flink program:

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic
Copy the code

3.4 Test Results

Send a message to Flink program on Kafka producer, observe the output of Flink program after conversion, as follows:

You can see that the data sent by the Kafka generator is normally received by the Flink program and then converted to the corresponding Topic of Kafka.

4. User-defined Sink

In addition to using built-in third-party connectors, Flink also supports the use of custom sinks to meet diverse output requirements. If you want to achieve a custom Sink, you need to directly or indirectly implement the SinkFunction interface. In general, we implement the abstract class RichSinkFunction, which provides more life-cycle related methods than SinkFunction. The relationship between the two is as follows:

Here, we take user-defined FlinkToMySQLSink as an example to write the calculation results into the MySQL database. The specific steps are as follows:

4.1 Importing Dependencies

Mysql-related dependencies need to be imported:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.16</version>
</dependency>
Copy the code

4.2 User-defined Sink

Inherit from RichSinkFunction to implement a custom Sink:

public class FlinkToMySQLSink extends RichSinkFunction<Employee> {

    private PreparedStatement stmt;
    private Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.cj.jdbc.Driver");
        conn = DriverManager.getConnection("JDBC: mysql: / / 192.168.0.229:3306 / employees." " +
                                           "? characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false"."root"."123456");
        String sql = "insert into emp(name, age, birthday) values(? ,? ,?) ";
        stmt = conn.prepareStatement(sql);
    }

    @Override
    public void invoke(Employee value, Context context) throws Exception {
        stmt.setString(1, value.getName());
        stmt.setInt(2, value.getAge());
        stmt.setDate(3, value.getBirthday());
        stmt.executeUpdate();
    }

    @Override
    public void close(a) throws Exception {
        super.close();
        if(stmt ! =null) {
            stmt.close();
        }
        if(conn ! =null) { conn.close(); }}}Copy the code

4.3 Using a User-defined Sink

If you want to use a user-defined Sink, you also need to call the addSink method as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Date date = new Date(System.currentTimeMillis());
DataStreamSource<Employee> streamSource = env.fromElements(
    new Employee("hei".10, date),
    new Employee("bai".20, date),
    new Employee("ying".30, date));
streamSource.addSink(new FlinkToMySQLSink());
env.execute();
Copy the code

4.4 Test Results

Start the program to observe database writes:

If the database is successfully written, the user-defined Sink is successfully integrated.

The source code for all of the above use cases is in this repository: Flink-kafka-Integration

The resources

  1. Data – sinks: ci.apache.org/projects/fl…
  2. Streaming Connectors:ci.apache.org/projects/fl…
  3. Apache Kafka Connector: ci.apache.org/projects/fl…