1. Flink runs the model

The above is the operation model of Flink. The program of Flink is mainly composed of three parts: Source, Transformation and Sink. The DataSource reads data, the Transformation reads data, and the Sink outputs data.

2. Flink program architecture

Each Flink program consists of several processes:

  • Get an execution environment; (Execution Environment)
  • Load/create initial data; (Source)
  • Specifies that the data be converted; (Transformation)
  • Specify where to place the calculated results; (Sink)
  • Trigger program execution.

3. Environment

The execution environment StreamExecutionEnvironment is the foundation of all Flink program.

There are three ways to create an execution environment:

StreamExecutionEnvironment.getExecutionEnvironment 
StreamExecutionEnvironment.createLocalEnvironment 
StreamExecutionEnvironment.createRemoteEnvironment
Copy the code

3.1 StreamExecutionEnvironment.getExecutionEnvironment

Creates an execution environment that represents the context of the currently executing program. If the program is called independently, this method returns the local execution environment; If a program is called from a command line client to commit to a cluster, this method returns the cluster’s execution environment. That is, getExecutionEnvironment determines which execution environment to return based on how the query is run, which is the most common way to create an execution environment.

val env = StreamExecutionEnvironment.getExecutionEnvironment
Copy the code

3.2 StreamExecutionEnvironment.createLocalEnvironment

Returns the local execution environment, which needs to specify a default degree of parallelism at call time.

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
Copy the code

3.3 StreamExecutionEnvironment.createRemoteEnvironment

Return to the cluster execution environment and commit the Jar to the remote server. You need to specify the IP and port number of the JobManager as well as the Jar package to run in the cluster at call time.

val env = StreamExecutionEnvironment.createRemoteEnvironment(1)
Copy the code

4. Source

4.1 File-based data source

  • readTextFile(path)

Reads a text file that follows the TextInputFormat specification column by column and returns the result as a String.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("/opt/modules/test.txt") stream.print() 
env.execute("FirstJob")
Copy the code
  • readFile(fileInputFormat, path)

Reads files in the specified file format.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val path = new Path("/opt/modules/test.txt") 
val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt") 
stream.print() env.execute("FirstJob")
Copy the code

4.2 Socket-based data source

  • socketTextStream

To read information from the Socket, elements can be separated by delimiters.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 val stream = env.socketTextStream("localhost", 11111) 
stream.print() 
env.execute("FirstJob")
Copy the code

4.3 Data source based on Collection

  • fromCollection(seq)

Creates a data flow from a collection where all elements are of the same type.

Val env = StreamExecutionEnvironment. GetExecutionEnvironment val list = list val stream = (1, 2, 3, 4) env.fromCollection(list) stream.print() env.execute("FirstJob")
Copy the code
  • fromCollection(Iterator)

Creates a data stream from an Iterator. The class specifying the element data type is returned by the Iterator.

Val env = StreamExecutionEnvironment. GetExecutionEnvironment val iterator = iterator val stream = (1, 2, 3, 4) env.fromCollection(iterator) stream.print() env.execute("FirstJob")
Copy the code
  • fromElements(elements:_*)

Creates a data stream from a given sequence of objects, all of which must be of the same type.

Val env = StreamExecutionEnvironment. GetExecutionEnvironment val list = list val stream = (1, 2, 3, 4) env.fromElements(list) stream.print() env.execute("FirstJob")
Copy the code
  • generateSequence(from, to)

To produce a sequence of numbers in parallel from a given interval.

Val env = StreamExecutionEnvironment. GetExecutionEnvironment val stream = env. GenerateSequence (1, 10) stream. The print () env.execute("FirstJob")
Copy the code

4. Sink

Data Sink consumes Data in DataStream and forwards it to a file, socket, external system, or print it out.

Flink has a number of built-in output formats wrapped in DataStream operations.

4.1 writeAsText

Writes elements line by line (TextOutputFormat) as strings retrieved by calling each element’s toString() method.

4.2 WriteAsCsv

Writes tuples to files in comma-separated format (CsvOutputFormat), with configurable separations between lines and fields. The value for each field comes from the object’s toString() method.

4.3 the print/printToErr

Prints the value of each element’s toString() method to the standard output or standard error output stream. Alternatively, you can add a prefix to the output stream to help distinguish the different print calls, and if the parallelism is greater than 1, the output will also have a flag identifying which task was produced.

4.4 writeUsingOutputFormat

Custom file output methods and base classes (FileOutputFormat) that support custom object to byte conversion.

4.5 writeToSocket

Write elements to the socket according to SerializationSchema.

5. Transformaction

5.1 the Map

DataStream → DataStream: Enter a parameter to generate a parameter.

Val env = StreamExecutionEnvironment. GetExecutionEnvironment val stream = env. GenerateSequence (1, 10) val streamMap = stream.map { x => x * 2 } streamMap.print() env.execute("FirstJob")
Copy the code

Note: stream.print() : The number before each line indicates which parallel thread outputs the line.

5.2 FlatMap

DataStream → DataStream: Enter a parameter to generate zero, one, or multiple outputs.

val env = StreamExecutionEnvironment.getExecutionEnvironment  
val stream = env.readTextFile("test.txt") 
val streamFlatMap = stream.flatMap{     x => x.split("") } 
streamFilter.print() 
env.execute("FirstJob")
Copy the code

5.3 the Filter

DataStream → DataStream: Evaluate the Boolean value of each element and return the element with a Boolean value of true. The following example filters out elements that are not zero:

Val env = StreamExecutionEnvironment. GetExecutionEnvironment val stream = env. GenerateSequence (1, 10) val streamFilter = stream.filter{ x => x == 1 } streamFilter.print() env.execute("FirstJob")
Copy the code

5.4 the Connect

DataStream,DataStream → ConnectedStreams: Connect two streams that retain their type. Once connected, the two streams are placed in the same stream, leaving their data and form unchanged.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
 
val streamMap = stream.flatMap(item => item.split("")).filter(item => item.equals("hadoop") val streamCollect = env.fromcollection (List(1,2,3,4) streamConnect.map(item=>println(item), item=>println(item)) env.execute("FirstJob")
Copy the code

5.5 CoMap will CoFlatMap

ConnectedStreams → DataStream: Applies to ConnectedStreams and performs the same functions as Map and flatMap to each Stream in ConnectedStreams.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val streamFlatMap = stream1.flatMap(x => x.split(""List(1,2,3,4)) val streamConnect = streamflatmap.connect (stream2) val streamCoMap = streamConnect.map( (str) => str +"connect",
    (in) = >in + 100
)
 
env.execute("FirstJob")
Copy the code
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
    (str1) => str1.split(""),
    (str2) => str2.split("")
)
streamConnect.map(item=>println(item), item=>println(item))
 
env.execute("FirstJob")
Copy the code

5.6 Spilt

DataStream → SplitStream: Split a DataStream into two or more datastreAms based on certain characteristics.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(""))
val streamSplit = streamFlatMap.split(
  num =>
The string contains hadoop to form a DataStream and the rest to form a DataStream
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)
 
env.execute("FirstJob")
Copy the code

5.7 Select

SplitStream→DataStream: Fetches one or more datastreAms from a SplitStream.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(""))
val streamSplit = streamFlatMap.split(
  num =>
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)
 
val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
hadoop.print()
 
env.execute("FirstJob")
Copy the code

5.8 the Union

DataStream → DataStream: Union two or more DataStream to create a new DataStream containing all DataStream elements. Note: If you union a DataStream with itself, in the new DataStream you will see every element appear twice.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(""))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(""))
val streamConnect = streamFlatMap1.union(streamFlatMap2)
 
env.execute("FirstJob")
Copy the code

5.9 KeyBy

DataStream → KeyedStream: The input must be of type Tuple, which logically divides a stream into disjoint partitions. Each partition contains elements with the same key and is implemented internally as a hash.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
    x => x.split("")
}
val streamMap = streamFlatMap.map{
    x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")
Copy the code

5.10 the Reduce

KeyedStream → DataStream: a grouping DataStream aggregation operation that combines the current element with the result of the last aggregation, producing a new value and returning a stream containing the result of each aggregation, rather than only returning the final result of the last aggregation.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt").flatMap(item => item.split("")).map(item => (item, 1)).keyBy(0)
 
val streamReduce = stream.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)
 
streamReduce.print()
 
env.execute("FirstJob")
Copy the code

5.11 Fold

KeyedStream → DataStream: a rolling fold operation of a grouped DataStream with an initial value that combines the current element with the result of the previous fold and produces a new value, returning the stream containing the result of each fold instead of only returning the final result of the last fold.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt").flatMap(item => item.split("")).map(item => (item, 1)).keyBy(0)
 
val streamReduce = stream.fold(100)(
  (begin, item) => (begin + item._2)
)
 
streamReduce.print()
 
env.execute("FirstJob")
Copy the code

5.12 Aggregations

KeyedStream → DataStream: a rolling aggregation operation on a grouped DataStream. The difference between min and minBy is that min returns a minimum value, while minBy returns the element containing the minimum value in its field (the same principle applies to Max and maxBy), and returns the stream containing the result of each aggregation, not just the final result of the last aggregation.

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
 
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test02.txt").map(item => (item.split("")(0), item.split("")(1).toLong)).keyBy(0)
 
val streamReduce = stream.sum(1)
 
streamReduce.print()
 
env.execute("FirstJob")
Copy the code

Before 5.10, operators can be directly applied to Stream, because they are not aggregation type operations. However, after 5.10, you will find that although we can directly apply aggregation operators to an unbounded Stream, it will record the aggregation results every time, which is often not what we want. In fact, Aggregation operators such as Reduce, fold and aggregation are used in conjunction with Window. Only by combining with Window can the desired result be obtained.