“This is the 40th day of my participation in the November Gwen Challenge. See details: The Last Gwen Challenge 2021”.

I. Introduction to Flink

1. I first met Flink

Flink grew out of the Stratosphere Project, a research project conducted between 2010 and 2014 by three Universities in Berlin and several others in Europe, In April 2014, Stratosphere code was copied and donated to the Apache Software Foundation. The initial members of the incubator were the core developers of the Stratosphere system. In December 2014, Flink jumped to the top of the Apache Software Foundation’s list.

Flink is the German word for speed and dexterity, and the project used a colorful image of a squirrel as its logo, not only because squirrels are fast and dexterity, but also because squirrels in Berlin have an attractive reddish brown color, and Flink’s squirrel logo has a cute tail, The tail color echoes the Apache Software Foundation logo color, which means it’s an Apache-style squirrel.

The philosophy behind the Flink project is: “Apache Flink is an open source flow processing framework for distributed, high performance, ready-to-use, and accurate flow processing applications.”

Apache Flink is a framework and distributed processing engine for stateful computation of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at memory execution speed and at any size.

2. Important features of Flink

2.1 event-driven

Event-driven applications are a class of stateful applications that extract data from one or more streams of events and trigger calculations, status updates, or other external actions based on incoming events. Typically, message queues like Kafka are almost always event-driven applications.

SparkStreaming microbatch is different, as shown below:

Event-driven:

2.2 Flow and batch world view

Batch processing is characterized by bounded, persistent, and large quantities, making it ideal for computations that require access to a full set of records, and is generally used for off-line statistics.

Stream processing is characterized by unbounded and real-time processing. It does not need to operate on the entire data set, but on each data item transmitted through the system. It is generally used for real-time statistics.

In Spark’s worldview, everything is made up of batches. Offline data is a batch, while real-time data is made up of an infinite number of small batches.

In Flink’s world view, everything is made up of streams. Offline data is a flow with boundaries, while real-time data is a flow without boundaries, which is called bounded flow and unbounded flow.

Unbounded data flows: Unbounded data flows have a start but no end, they do not terminate and provide data on generation, and must be processed continuously, that is, events must be processed immediately after fetching. For unbounded data streams we cannot wait for all data to arrive because the input is unbounded and will not be complete at any point in time. Processing unbounded data typically requires that events be retrieved in a specific order, such as the order in which they occurred, so that result integrity can be inferred.

Bounded data stream: A bounded data stream has a well-defined beginning and end, and can be processed by getting all the data before performing any computations. Processing of a bounded stream does not require an ordered fetch, because the bounded data set can always be sorted. Processing of a bounded stream is also known as batch processing.

The biggest benefit of this flow-based architecture is that it has very low latency.

2.3. Layered apis

Flink several large modules

  • Flink Table & SQL
  • Flink Gelly
  • Flink CEP

Second, get started quickly

1. Set up Maven project FlinkTutorial

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - streaming - java_2. 12</artifactId>
        <version>1.10.1</version>
    </dependency>
</dependencies>
Copy the code

2. Batch wordcount

public static void main(String[] args) throws Exception {
    // Create execution environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // Read data from a file
    String inputPath = "word.txt";
    DataSet<String> inputDataSet = env.readTextFile(inputPath);

    // The data set is processed according to the space segmentation word expansion, converted into (word,1) binary group for statistics
    inputDataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : value.split("")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }).groupBy(0) // Group by word in the first position
            .sum(1) // Sum the data in the second position
            .print();

}
Copy the code

3. Stream wordcount

public static void main(String[] args) throws Exception {
    // Create a streaming environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Use the parameter tool to extract configuration items from program startup parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    // nc -lk 7777
    // --host hadoop102 --port 7777
    String hostname = parameterTool.get("host"."hadoop102");
    int port = parameterTool.getInt("port".7777);

    DataStream<String> inputDataSet = env.socketTextStream(hostname, port);

    // Perform conversion calculation based on data flow
    inputDataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : value.split("")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }).keyBy(0)
            .sum(1)
            .print();

    // Execute the task
    env.execute();
}
Copy the code