I. Built-in Data Source

Flink Data Source is used to define the Data Source of Flink program. Flink officially provides a variety of Data acquisition methods to help developers build input streams simply and quickly, as follows:

1.1 Build based on files

1. ReadTextFile (path) : Reads a text file in TextInputFormat and returns its content as a string. The following is an example:

env.readTextFile(filePath).print();
Copy the code

2. ReadFile (fileInputFormat, path) : reads files in the specified format.

ReadFile (inputFormat, filePath, watchType, Interval, typeInformation) : periodically reads files in the specified format. The meanings of each parameter are as follows:

  • InputFormat: indicates the inputFormat of a data stream.
  • FilePath: indicates the filePath. It can be a local file system path or a filePath in the HDFS.
  • watchType: Read mode, which has two optional values, respectivelyFileProcessingMode.PROCESS_ONCEFileProcessingMode.PROCESS_CONTINUOUSLY: reads the data in the specified path only once and then exits. The latter means that the path is scanned and read periodically. Note that if watchType is set toPROCESS_CONTINUOUSLYWhen a file is modified, all of its contents (both old and new) will be reprocessed, thus breaking Flink’sexactly-onceSemantics.
  • Interval: indicates the interval for periodic scanning.
  • TypeInformation: The type of the element in the input stream.

The following is an example:

final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
             filePath,
             FileProcessingMode.PROCESS_ONCE,
             1,
             BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();
Copy the code

1.2 Build based on collections

1. FromCollection: Based on collections, all elements in a Collection must be of the same type. The following is an example:

env.fromCollection(Arrays.asList(1.2.3.4.5)).print();
Copy the code

2. fromElements(T …) : Built on elements, all elements must be of the same type. The following is an example:

env.fromElements(1.2.3.4.5).print();
Copy the code

3. GenerateSequence (from, to) : Build based on the given sequence interval. The following is an example:

env.generateSequence(0.100);
Copy the code

FromCollection (Iterator, Class) : Build from iterators. The first parameter defines the iterator, and the second parameter defines the type of the output element. The following is an example:

env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();
Copy the code

CustomIterator is a user-defined iterator. This section uses the data generated between 1 and 100 as an example. The Serializable interface must be Serializable as well as Iterator. If the Serializable interface is not Serializable, the Serializable interface must be Serializable.

import java.io.Serializable;
import java.util.Iterator;

public class CustomIterator implements Iterator<Integer>, Serializable {
    private Integer i = 0;

    @Override
    public boolean hasNext(a) {
        return i < 100;
    }

    @Override
    public Integer next(a) {
        i++;
        returni; }}Copy the code

5. FromParallelCollection (SplittableIterator, Class) : The SplittableIterator method takes two arguments. The second argument defines the type of the output element, and the first argument is the abstract base class for the iterator, which splits the original iterator values into multiple disjoint iterators.

1.3 Build based on Sockets

Flink provides the socketTextStream method to build socket-based data streams. The socketTextStream method takes the following four main parameters:

  • Hostname: indicates the hostname.
  • Port: indicates the port number. If this parameter is set to 0, the port number is automatically allocated.
  • Delimiter: Delimiter used to separate each record;
  • MaxRetry: Maximum retry interval (in seconds) for a program when the Socket is temporarily closed. If this parameter is set to 0, retry is not performed. If the value is set to negative, retry all the time. The following is an example:
Env. SocketTextStream (" 192.168.0.229 ", 9999, "\ n", 3). Print ();Copy the code

2. Customize Data Source

2.1 SourceFunction

In addition to the built-in data sources, users can add custom data sources using the addSource method. Custom data sources must implement the SourceFunction interface, here to generate [0, 1000) range of data as an example, the code is as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new SourceFunction<Long>() {
    
    private long count = 0L;
    private volatile boolean isRunning = true;

    public void run(SourceContext<Long> ctx) {
        while (isRunning && count < 1000) {
            // Send the input through collectctx.collect(count); count++; }}public void cancel(a) {
        isRunning = false;
    }

}).print();
env.execute();
Copy the code

2.2 ParallelSourceFunction and RichParallelSourceFunction

The setParallelism(n) method is not supported on DataStream. If the setParallelism(n) method is not supported on DataStream, the following exception will be raised:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
Copy the code

If you want to achieve with the parallelism of the input stream, you will need to implement ParallelSourceFunction or RichParallelSourceFunction interface, its relationship with SourceFunction below:

ParallelSourceFunction is a direct descendant of ParallelSourceFunction. RichParallelSourceFunction is inherited from AbstractRichFunction, realized the ParallelSourceFunction interface at the same time, so it not only has the function of parallelism, also provides additional related to life cycle method, Such as open(), closen().

Third, the Streaming Connectors

3.1 Built-in Connector

In addition to custom data sources, Flink also comes with a variety of built-in connectors for most data collection scenarios. The current built-in connectors are supported as follows:

  • Apache Kafka (support source and sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

In addition to the connectors above, you can also extend Flink with Apache Bahir’s connectors. Apache Bahir is designed to provide functional extensions for distributed data analysis systems such as Spark and Flink. Currently, it supports the following flink-related connectors:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

As Flink continues to evolve, we can expect it to support more and more types of Connectors. For further development of Connectors, see its official documentation: Streaming Connectors. Of all the DataSource Connectors, Kafka is the most widely used, so we use it as an example to illustrate the Connectors’ integration steps.

3.2 integrated Kakfa

1. Import dependencies

When integrating Kafka, it is important to be aware of the version of Kafka used. Maven dependencies and classes used for development vary from version to version as follows:

Maven rely on Flink version The name of the Consumer and Producer class Kafka version
Flink connector – kafka – 0.8 _2. 11 1.0.0 + FlinkKafkaConsumer08

FlinkKafkaProducer08
X 0.8.
Flink connector – kafka – 0.9 _2. 11 1.0.0 + FlinkKafkaConsumer09

FlinkKafkaProducer09
X 0.9.
Flink connector – kafka – 0.10 _2. 11 1.2.0 + FlinkKafkaConsumer010

FlinkKafkaProducer010
X 0.10.
Flink connector – kafka – 0.11 _2. 11 1.4.0 + FlinkKafkaConsumer011

FlinkKafkaProducer011
X 0.11.
Flink – connector – kafka_2. 11 1.7.0 + FlinkKafkaConsumer

FlinkKafkaProducer
> = 1.0.0

Here I use Kafka version kafka_2.12-2.2.0, add the following dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>Flink - connector - kafka_2. 11</artifactId>
    <version>1.9.0</version>
</dependency>
Copy the code

2. Code development

Here’s an example of the simplest scenario, which takes the data from Kafka and prints it:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// Specify the Kafka connection position
properties.setProperty("bootstrap.servers"."hadoop001:9092");
// Specify the topic to listen on and define the conversion rules between Kafka byte messages and Flink objects
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic".new SimpleStringSchema(), properties));
stream.print();
env.execute("Flink Streaming");
Copy the code

3.3 Integration Test

1. Start the Kakfa

Kafka relies on ZooKeeper to run, and requires a pre-boot. You can start Kafka’s built-in ZooKeeper, or start its own:

#Zookeeper startup command
bin/zkServer.sh start

#Built-in ZooKeeper startup commands
bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code

Start single-node kafka for testing:

# bin/kafka-server-start.sh config/server.properties
Copy the code

2. Create a Topic

#Create a test topic
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-in-topic

#View all topics
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
Copy the code

3. Start the Producer

Start a Kafka producer to send test data:

bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic
Copy the code

4. Test results

Input any test data on the Producer, and then watch the output from the program console:

The output of the program console is as follows:

You can see that the relevant data has been successfully received and printed out.

The resources

  1. Data-sources:ci.apache.org/projects/fl…
  2. Streaming Connectors:ci.apache.org/projects/fl…
  3. Apache Kafka Connector: ci.apache.org/projects/fl…