preface

This series of articles will start from the hot word statistical requirements, explain the application, deployment and fault tolerance of FLink in actual projects. The code in the project is written in Scala, which is not used because the language looks more concise and has better compatibility with the Java language, allowing you to reference classes directly from Java.

Defining data formats

This article will talk about the core of a hot word statistics function, that is, the message delivered is the search of the word, the program for statistics. Based on this, we define the data format to be passed as ${timestamp},${word}. Timestamp is used to mark the search time and modify the watermask in Flink

Offline data simulation

Once the data format is defined, we can convert it to a Tuple in Scala, where we first use the fromElements API to create offline data for debugging

val currentTimeStamp = System.currentTimeMillis()
val dataStream = env.fromElements(
  (currentTimeStamp,"word1"),
  (currentTimeStamp+1."word2"),
  (currentTimeStamp+2."word1"),
  (currentTimeStamp+3."word3"),Copy the code

The specific calculation

[flink-1.8 / operator] [flink-1.8 / operator] [flink-1.8 / operator] [flink-1.8 / operator] [flink-1.8 / operator] [flink-1.8 / operator] The keyBy method must be called to convert dataStream to keyedStream before sum is executed. We have two fields, the timestamp and the word. To keyBy the word we pass in 1 (subscript 0), but what does sum pass in? ${timestamp},${word},${count},${timestamp},${word},${count},${timestamp},${word},${count}. After the above analysis, there is the following code

dataStream
      .map(x => (x._1, x._2, 1))
      .keyBy(1)
      .sum(2)
      .print()
Copy the code

For debugging purposes, the final output to the console is

6 > (1556680015646, word2, 1)

12 > (1556680015648, word3, 1)

10 > (1556680015647, word1, 1)

10 > (1556680015647, word1, 2)

The number at the beginning of the output is the thread identifier. Word2 and word3 are different threads, while word1 is the same thread

Question answer

  1. Why is there two outputs for word1 statistics here? Since this is real-time streaming computing, each operator calculates a piece of data once for it, so there is a corresponding output at last
  2. So when does the number of outputs go down? When using Windows, multiple pieces of data are aggregated into a window according to certain rules (by number or by time period), and each window has only one output

tip

  1. Here we use the Scala language to write the Flink project. It is important to add the following code to the POM.xml file to enable Scala compilation
<plugins>
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
            <! -- Run scala compiler in the process-resources phase, so that dependencies on scala classes can be resolved later in the (Java) compile phase -->
            <execution>
                <id>scala-compile-first</id>
                <phase>process-resources</phase>
                <goals>
                    <goal>compile</goal>
                </goals>
            </execution>
            <! -- Run scala compiler in the process-test-resources phase, so that dependencies on scala classes can be resolved later in the (Java) test-compile phase -->
            <execution>
                <id>scala-test-compile</id>
                <phase>process-test-resources</phase>
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
        <configuration>
            <jvmArgs>
                <jvmArg>-Xms128m</jvmArg>
                <jvmArg>-Xmx512m</jvmArg>
            </jvmArgs>
        </configuration>
    </plugin>
</plugins>
Copy the code
  1. In project references, references to scala suffixes such as
<properties>
    <scala.binary.version>2.11</scala.binary.version>
    <flink.version>1.8.0 comes with</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
Copy the code
  1. File references in your inputStreamExecutionEnvironmentIntellij will prompt you to importorg.apache.flink.streaming.api.scala. StreamExecutionEnvironmentBut when you callenv.fromElements()Method you will get a type error from the compiler

import org.apache.flink.streaming.api.scala._

At the end

This article of the actual content is not much, but for the sake of taking care of flink, scala students do not understand, I will put some code, the details of the train of thought clear, may be some repetitive, there may be some improper language organization, please kindly understanding, if you have any Suggestions, welcome to communicate with me in the comments section, this is my update on power

Ah right, almost forgot to stick the code [github.com/Wing-Lo/fli…]