This is the 11th day of my participation in the Gwen Challenge in November. Check out the details: The last Gwen Challenge in 2021

The characteristics of the Flink

  • Event-driven

  • Stream-based processing

    Everything is made up of streams. Offline data is a bounded stream. Real-time data is a flow without boundaries. (bounded flow, unbounded flow)

  • Layered API

    • The more abstract the top layer, the more concise the expression meaning, the more convenient to use
    • The more specific the bottom, the richer the expression ability, the more flexible the use

Flink vs Spark Streaming

  • The data model
    • Spark uses the RDD model. The DStream of Spark Streaming is actually a collection of RDD of a small batch of data
    • The basic data model of Flink is data flow and Event sequence
  • Runtime architecture
    • Spark is batch computing, which divides the DAG into different stages. After one stage is completed, the next stage can be computed
    • Flink is a standard stream execution mode in which an event processed by one node can be sent directly to the next node for processing

Quick learning

Batch implementation of WordCount

Flink – streaming – java_2. 12:1. = > 12.1 org. Apache. Flink: flink – runtime_2. 12:1. 12.1 = > Com.typesafe. akka:akka-actor_2.12:2.5.21, akka is implemented in Scala. Even though we’re using the Java language here, we’re using the scala implementation package

Pom depends on


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.caicai</groupId>
    <artifactId>Flink</artifactId>
    <version>1.0 the SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.0</flink.version>
        <scala.binary.version>2.14</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>
Copy the code

The preparatory work

First, prepare a file to store some simple data for subsequent Flink calculation and analysis. Create a new hello. TXT file in the Resources directory and save some data

hello java
hello flink
hello scala
hello spark
hello storm
how are you
fine thank you
and you
Copy the code

Code implementation

package com.caicai;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/ * * *@author wangcaicai
 * @date 2021/11/18 0:51
 * @descriptionBatch * /
public class WordCount {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Read data from a file
        String inputPath = "E:\\java\\WorkSpace\\Flink\\src\\main\\resources\\hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);

        // The data set is processed according to the space segmentation word expansion, converted into (word, 1) binary group for statistics
        // Group by word in the first position
        // Sum the data in the second position
        DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0)
                .sum(1);
        resultSet.print();
    }

    // Implement FlatMapFunction interface
    public static class MyFlatMapper implements FlatMapFunction<String.Tuple2<String.Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
            // Use space to break words
            String[] words = s.split("");
            // Iterate over all word, package it into binary output
            for (String str : words) {
                out.collect(new Tuple2<>(str, 1)); }}}}Copy the code

The output

(thank,1)
(spark,1)
(and,1)
(java,1)
(storm,1)
(flink,1)
(fine,1)
(you,3)
(scala,1)
(are,1)
(how,1)
(hello,5)
Copy the code

No ExecutorFactory found to execute the application

Stream processing implementation

On the basis of 2.1 batch processing, create a new class to make changes.

  • Batch processing => several groups or all data arrive before processing; Stream processing => there is data to directly process, and different data stack to a certain order of magnitude
  • Here is not like a batch has groupBy => all data unified processing, but with the flow of keyBy => each data hash, similar to partitioning operations, to a data processing once, all intermediate processes have output!
  • Parallelism: The parallelism of the development environment defaults to the number of CPU cores on the computer

Code implementation

package com.caicai;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/ * * *@author wangcaicai
 * @date 2021/11/18 0:51
 * @descriptionStream processing * /
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set the parallelism, default = the number of CPU cores on the current computer (set to 1 for single-thread processing)
        // env.setMaxParallelism(4);

        // Read data from a file
        String inputPath = "E:\\java\\WorkSpace\\Flink\\src\\main\\resources\\hello.txt";
        DataStream<String> inputDataStream = env.readTextFile(inputPath);

        // The data set is processed according to the space segmentation word expansion, converted into (word, 1) binary group for statistics
        // Group by word in the first position
        // Sum the data in the second position
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
                .keyBy(item -> item.f0)
                .sum(1);
        resultStream.print();

        // Execute the task
        env.execute();
    }

    // Implement FlatMapFunction interface
    public static class MyFlatMapper implements FlatMapFunction<String.Tuple2<String.Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
            // Use space to break words
            String[] words = s.split("");
            // Iterate over all word, package it into binary output
            for (String str : words) {
                out.collect(new Tuple2<>(str, 1)); }}}}Copy the code

Output:

In this case, since it is stream processing, all intermediate processes are printed out, and the preceding sequence number is the thread number that executes the task in parallel.

9> (how,1)
1> (scala,1)
6> (storm,1)
6> (are,1)
4> (hello,1)
4> (hello,2)
2> (java,1)
4> (hello,3)
1> (spark,1)
7> (you,1)
10> (flink,1)
4> (hello,4)
4> (hello,5)
7> (fine,1)
7> (you,2)
5> (thank,1)
11> (and,1)
7> (you,3)
Copy the code

Streaming data source testing

  1. throughnc -lk <port>Open a socket service to simulate real-time streaming data
nc -lk 7777
Copy the code
  1. The code modifies the inputStream section
package com.caicai;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/ * * *@author wangcaicai
 * @date 2021/11/18 0:51
 * @descriptionStream processing * /
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set the parallelism, default = the number of CPU cores on the current computer (set to 1 for single-thread processing)
        // env.setMaxParallelism(32);

        // Read data from a file
        // String inputPath = "E:\\java\\WorkSpace\\Flink\\src\\main\\resources\\hello.txt";
        // DataStream<String> inputDataStream = env.readTextFile(inputPath);

        // Read data from the socket text stream
        DataStream<String> inputDataStream = env.socketTextStream("192.168.200.130".7777);

        // The data set is processed according to the space segmentation word expansion, converted into (word, 1) binary group for statistics
        // Group by word in the first position
        // Sum the data in the second position
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
                .keyBy(item -> item.f0)
                .sum(1);
        resultStream.print();

        // Execute the task
        env.execute();
    }

    // Implement FlatMapFunction interface
    public static class MyFlatMapper implements FlatMapFunction<String.Tuple2<String.Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
            // Use space to break words
            String[] words = s.split("");
            // Iterate over all word, package it into binary output
            for (String str : words) {
                out.collect(new Tuple2<>(str, 1)); }}}}Copy the code
  1. Enter data in the socket that is enabled locally and observe the console output of IDEA.

    Key => hashCode. The hash value of the same key is fixed and assigned to the corresponding thread for processing.

Optimize the modification

In the above code, we write the host and port in the code, which is not good, we can set the parameters (args), using the parameter tool to extract these configuration items

// Change the part
import org.apache.flink.api.java.utils.ParameterTool;
// Use the parameter tool to extract configuration items from program startup parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
// Read data from the socket text stream
DataStream<String> inputDataStream = env.socketTextStream(host, port);
Copy the code

Set the configuration item in the ARGS parameter

The input

- host 192.168.200.130 -- port 7777Copy the code

Then click Apply in the lower right corner and run the program again