An overview of
1.1 Evolution of flow processing technology
In the open source world, the Apache Storm project is a pioneer in stream processing. Storm was first developed by Nathan Marz and a team at startup BackType before being accepted by the Apache Foundation. Storm provides low-latency streaming, but it comes at a price for real time: high throughput is difficult to achieve, and correctness is not as good as is usually required, in other words, it does not guarantee exactly once, and even at the level of correctness it can guarantee, the overhead is quite high.
It is very difficult to maintain good fault tolerance in low latency and high throughput stream processing systems, but in order to obtain guaranteed accuracy, an alternative approach has been thought of: splitting stream data in continuous time into a series of tiny batch operations. If the segmentation is small enough (a so-called microbatch job), the computation can be almost true stream processing. Because of the latency, it is impossible to achieve complete real-time, but every simple application can achieve a latency of only a few seconds or even subseconds. This is the approach used by Spark Streaming running on the Spark batch engine.
More importantly, using the microbatch approach, the exact-once semantics can be implemented to guarantee state consistency. If a microbatch fails, it can be re-run, which is easier than the continuous stream approach. Storm Trident is an extension of Storm in that its underlying stream processing engine is based on the microbatch method of computation, implementing the exact-once semantics, but at a significant cost in terms of latency.
For microbatch processing strategies such as Storm Trident and Spark Streaming, only multiples of batch operation time can be used for segmentation, and event data cannot be separated according to actual conditions. Moreover, for some jobs that are sensitive to latency, developers often need to spend a lot of energy to improve performance when writing business code. These limitations in flexibility and expressiveness lead to slow development and high operation and maintenance costs of these microbatch strategies.
Flink, then, is a technical framework that avoids these drawbacks, has a lot of functionality, and can efficiently process data in a sequence of events. Some of Flink’s features are shown below:
1.2 I met Flink
Flink grew out of the Stratosphere Project, a research project carried out between 2010 and 2014 by three Berlin universities and several other universities in Europe. In April 2014, the Stratosphere code was copied and donated to the Apache Software Foundation. The initial members of the incubator were core developers for the Stratosphere system, and in December 2014, Flink became an Apache Software Foundation top project.
Flink is the German word for speed and dexterity, and the project used a colorful image of a squirrel as its logo, not only because squirrels are fast and dexterity, but also because squirrels in Berlin have an attractive reddish brown color, and Flink’s squirrel logo has a cute tail, The tail color echoes the Apache Software Foundation logo color, which means it’s an Apache-style squirrel.
The website links
At the top of the Flink home page, the project’s philosophy is displayed: “Apache Flink is an open-source flow processing framework for distributed, high-performance, ready-to-use, and accurate flow processing applications.”
Apache Flink is a framework and distributed processing engine for stateful computation of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at memory execution speed and at any size.
1.3 Flink core computing framework
The core computing architecture of Flink is the Flink Runtime execution engine shown below, which is a distributed system capable of accepting data-streaming programs and executing them in a fault-tolerant manner on one or more machines.
The Flink Runtime execution engine can run as an application of YARN (Yet Another Resource Negotiator) on a cluster, on a Mesos cluster, or on a standalone machine (which is useful for debugging a Flink application).
The figure above shows the core components of Flink’s technology stack. It is worth noting that Flink provides a stream-oriented interface (DataStream API) and a batch-oriented interface (DataSet API) respectively. Therefore, Flink can do both stream and batch processing. Flink supports extended libraries for machine learning (FlinkML), complex Event processing (CEP), and graph computation (Gelly), as well as Table apis for streaming and batch processing, respectively.
Flink is a real big data computing framework combined with batch flow. It integrates the computing under the background of big data, not only reducing the difficulty of learning and operation, but also effectively realizing the unification of offline computing and real-time computing
Programs that can be accepted by the Flink Runtime execution engine are powerful, but such programs have lengthy code and are laborious to write. For this reason, Flink provides an API wrapped on top of the Runtime execution engine to help users easily generate streaming programs. Flink provides DataStream API for stream processing and DataSet API for batch processing. It is worth noting that although the Flink Runtime execution engine is based on stream processing, the DataSet API was developed before the DataStream API because the industry demand for unlimited stream processing was not great when Flink was born.
The DataStream API smoothly analyzes unlimited data streams and can be implemented in Java or Scala. Developers need to build on a data structure called DataStream that represents a never-ending flow of distributed data.
Flink’s distributed nature is reflected in its ability to run on hundreds or thousands of machines, which divide large computing tasks into many smaller parts, with each machine performing part of the task. Flink automatically ensures that calculations continue in the event of machine failures or other errors, or are scheduled to be performed again after bug fixes or version upgrades. This capability allows developers to avoid having to worry about runtime failures. Flink essentially uses fault-tolerant data streams, which allow developers to analyze data that is continuously generated and never ends (i.e., stream processing).
Ii. Flink basic architecture
2.1 JobManager与TaskManager
The Flink runtime contains two types of processors:
**JobManager processors: ** also known as masters, are used to coordinate distributed execution. They are used to schedule tasks, coordinate checkpoints, recover when coordination fails, and so on. Flink runs with at least one master processor, or multiple master processors if configured in high availability mode, one of which is the Leader and the others are all standby.
TaskManager handler: Also known as Worker, used to perform a Dataflow task(or special subtask), data buffer and data stream exchange. Flink runs with at least one Worker handler.
The simple picture is as follows
Master and Worker processors can be started directly on a physical machine or through a resource scheduling framework such as YARN.
The Worker connects to the Master, notifies itself of its availability, and then obtains task assignment.
2.2 Unbounded data flow and bounded data flow
Flink is used to handle bounded and unbounded data:
Unbounded data flows: Unbounded data flows have a start but no end, they do not terminate and provide data on generation, and must be processed continuously, that is, events must be processed immediately after fetching. For unbounded data streams we cannot wait for all data to arrive because the input is unbounded and will not be complete at any point in time. Processing of unbounded data usually requires that events be retrieved in a particular order (for example, the order in which they occur) so that result integrity can be inferred, and processing of unbounded flow is called stream processing.
Bounded data stream: A bounded data stream has a well-defined beginning and end, and can be processed by getting all the data before performing any computations. Processing of a bounded stream does not require an ordered fetch, because the bounded data set can always be sorted. Processing of a bounded stream is also known as batch processing.
In unbounded data flow and bounded data flow, we mentioned batch processing and stream processing, which are two common data processing methods in big data processing system.
The characteristics of batch processing are bounded, persistent, and large. Batch processing is ideal for computing work that requires access to a full set of records, and is generally used for offline statistics. The characteristics of stream processing are unbounded and real-time. Instead of performing an operation on the entire data set, stream processing performs an operation on each data item transmitted through the system, which is generally used for real-time statistics.
In the Spark ecosystem, different technical frameworks are used for batch processing and stream processing. Batch processing is implemented by SparkSQL, and stream processing is implemented by Spark Streaming. This is also the strategy adopted by most frameworks, using independent processors to implement batch processing and stream processing respectively. Flink can do both batch and stream processing.
How does Flink implement batch and stream processing at the same time? The answer is that Flink treats batch processing (that is, processing a finite amount of static data) as a special kind of stream processing.
Apache Flink is an open source computing platform for distributed data streaming and batch data processing. It can support both stream and batch applications based on the same Flink Runtime. Existing open source computing solutions treat stream and batch processing as two different application types because their goals are completely different: Stream processing generally needs to support low latency and exact-once guarantees, while batch processing needs to support high throughput and efficient processing, so it is usually implemented in two separate implementations, or through a separate open source framework to implement each of these processing solutions. For example, open source solutions for batch processing include MapReduce, Tez, Crunch, and Spark, and open source solutions for stream processing include Samza and Storm.
Flink in the implementation of stream processing and batch processing, and some traditional solutions are completely different, it looks at the stream processing and batch processing from another perspective, the two unified: Flink is fully support stream processing, that is to say, when viewed as a stream processing, the input data stream is unbounded; Batch processing is treated as a special kind of stream, except that its input data stream is defined as bounded. Based on the same Flink Runtime, the Flink Runtime provides streaming and batch apis, respectively, which are the basis for the implementation of the upper stream oriented, batch type application framework.
2.3 Data stream programming model
Flink provides different levels of abstraction to develop stream or batch jobs, as shown below:
The lowest level of abstraction only provides stateful flows, which will be embedded into the DataStream API through Process functions. The underlying Process Function integrates with the DataStream API to provide low-level abstraction for specific operations, which allows users to freely Process events from one or more data streams with consistent fault-tolerant state. In addition, users can register event times and handle time callbacks, allowing the program to handle complex calculations.
In fact, most applications do not require the underlying abstractions described above and instead program against Core APIs such as DataStream (bounded or unbounded streaming data) and DataSet API (bounded data sets). These apis provide common building blocks for data processing, like user-defined transformations, joins, aggregations, Windows, and so on. The DataSet API provides additional support for bounded datasets, such as loops and iterations. The data types handled by these apis are represented by their respective programming languages in the form of classes.
The Table API is centered on tables, which can change dynamically (when expressing streaming data). The Table API follows the (extended) relational model: tables have two-dimensional data structures (similar to tables in relational databases) and the API provides comparable operations, such as SELECT, Project, Join, group-by, aggregate, and so on. The Table API program declaratively defines what logical operations should be performed, rather than determining exactly how the code for those operations looks. Although the Table API can be extended with many types of user-defined functions (UDFs), it is not as expressive as the core API, but it is much cleaner to use (with less code). In addition, the Table API program is optimized by the built-in optimizer before execution.
You can seamlessly switch between tables and DataStream/DataSet to allow applications to mix the Table API with DataStream and DataSet.
The highest level of abstraction that Flink provides is SQL. This layer of abstraction is similar to the Table API in syntax and expressiveness, but represents the program in the form of SQL query expressions. SQL abstraction interacts closely with the Table API, and SQL queries can be executed directly on tables defined by the Table API.
Three Flink clusters are set up
The deployment options for Flink are:
Local, Standalone (low resource utilization), Yarn, Mesos, Docker, Kubernetes, AWS.
We mainly analyze Flink cluster deployment in Standalone mode and Yarn mode.
3.1 Standalone Mode Installation
We installed the Flink cluster in standalone mode and prepared three virtual machines, one as JobManager (Hadoop101) and two as TaskManager (Hadoop102, Hadoop103).
-
First download from the official website
-
Then, the downloaded package is sent to the VM and decompressed to the specified location
-
Then modify the configuration file
[cris@hadoop101 conf]$ vim flink-conf.yaml Copy the code
Then modify the Worker node configuration
[cris@hadoop101 conf]$ vim slaves Copy the code
-
Finally, Flink can be synchronized to the other two Worker nodes
[cris @ hadoop101 module] $xsync flink 1.6.1 /Copy the code
-
The startup command is as follows
[cris@hadoop101 bin]$ ./start-cluster.sh Copy the code
Very simple ~
View the process status through JPS
[cris@hadoop101 bin]$ jpsall ----------jps of hadoop101--------- 2491 StandaloneSessionClusterEntrypoint 2555 Jps ----------jps of hadoop102--------- 2338 Jps 2285 TaskManagerRunner ----------jps of hadoop103--------- 2212 Jps 2159 TaskManagerRunner Copy the code
-
Accessing the Cluster Web UI (Port 8081)
If the following interface is displayed, the Flink cluster is successfully started
-
Run a simple WC task
-
Shut down the cluster
[cris@hadoop101 bin]$ ./stop-cluster.sh Stopping taskexecutor daemon (pid: 2285) on host hadoop102. Stopping taskexecutor daemon (pid: 2159) on host hadoop103. Stopping standalonesession daemon (pid: 2491) on host hadoop101. [cris@hadoop101 bin]$ jpsall ----------jps of hadoop101--------- 3249 Jps ----------jps of hadoop102--------- 2842 Jps ----------jps of hadoop103--------- 2706 Jps Copy the code
3.2 Installation in Yarn Mode
The first four steps are the same as the Standalone mode
-
The environment variable HADOOP_HOME has been set on the VM
-
Starting a Hadoop cluster (HDFS and Yarn)
-
To submit yarn-session on hadoop101, run the yarn-session.sh script in the bin directory of the installation directory:
[cris@hadoop101 ~]$/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 6 -jM 1024-TM 1024-nm test -dCopy the code
Among them:
-n(–container) : indicates the number of TaskManagers.
-s(–slots) : Specifies the number of slots for each TaskManager. By default, each slot has one core and the default number of slots for each TaskManager is 1.
– Jm: indicates the memory of JobManager (unit: MB).
-tm: Memory of each TaskManager in MB.
-nm: specifies the appName of YARN (the current name on the YARN UI).
-d: background execution.
-
After Yarn is started, you can view the submitted session on the Web page:
Viewing Process Information
-
Run a simple task
[cris @ hadoop101 flink - 1.6.1] $. / bin/flink run - m yarn - cluster examples/batch/WordCount. The jarCopy the code
The terminal directly prints the result
Look at the Web interface
Flink operating architecture
4.1 Task Submission Process
After the Flink task is submitted, the Client uploads the Flink Jar package and configuration to the HDFS, and submits the Flink task to Yarn ResourceManager. ResourceManager allocates Container resources and notifies NodeManager to start ApplicationMaster
ApplicationMaster loads Flink Jar packages and configures the build environment. ApplicationMaster starts JobManager. ApplicationMaster applies for resources from ResourceManager to start TaskManager. After ResourceManager allocates Container resources, ApplicationMaster notifies NodeManager of the node where the resources reside to start TaskManager
NodeManager loads Flink’s Jar package, configures the build environment, and starts TaskManager. When TaskManager starts, it sends heartbeat packages to JobManager and waits for JobManager to assign tasks to it
4.2 TaskManager and Slots
Each TaskManager is a JVM process that may execute one or more subtasks on separate threads. In order to control how many tasks a worker can receive, the worker controls through task slot (each worker has at least one task slot). ,
Each Task slot represents a fixed-size subset of resources owned by the TaskManager. If a TaskManager has three slots, it triples the memory it manages into each slot. Resource slotting means that a subtask does not have to compete with subtasks from other jobs for managed memory, but instead has a certain amount of memory reserved. Note that there is no CPU isolation involved; slot is currently only used to isolate the managed memory of tasks.
Adjusting the number of Task slots allows users to define how subtasks are isolated from each other. If a TaskManager has one slot, it means that each task group is running in a separate JVM (which may be launched from a specific container), while a TaskManager with multiple slots means that more subtasks can share the same JVM. Tasks within the same JVM process share TCP connections (based on multiplexing) and heartbeat messages. They may also share data sets and data structures, so this reduces the load on each task.
TaskSlot is a static concept, refers to the ability to execute concurrently TaskManager is * *, can pass parameters TaskManager. NumberOfTaskSlots configured, and the parallel degree of parallelism is a dynamic concept, That is, the concurrency capability that TaskManager actually uses when running the program, which can be configured using parameter Parallelism.
In other words, suppose there are 3 taskManagers, and each TaskManager is assigned 3 taskslots, that is, each TaskManager can receive 3 tasks, a total of 9 taskslots. If we set parallelism. Default =1, the default parallelism for running the program is 1.
4.3 Dataflow
Flink program is composed of three core components: Source, Transformation and Sink. Source is mainly responsible for reading data, Transformation is mainly responsible for the Transformation operation, Sink is responsible for the output of final data. Data that flows between components is called Streams.
The basic building blocks of Flink programs are streams and transformations, and you can look at the DataSets that Flink’s DataSet API uses inside of them. A stream can be viewed as an intermediate result, or a doubling is an operation that produces one or more result streams as inputs.
At run time, programs running on Flink are mapped to Streaming Dataflows, which contains Streams and Mineralogy Operators. Each dataflow starts with one or more sources and ends with one or more sinks. Dataflow is similar to any directed acyclic graph (DAG).
4.4 Parallel Data Flow
The execution of Flink program is parallel and distributed. During execution, a stream contains one or more Stream partitions, and each operator contains one or more Operator subtasks, These operator subtasks are performed independently on different threads, on different physical machines, or in different containers.
The number of subtasks for a particular operator is called parallelism. The parallelism of a stream is always equal to the parallelism of its Producing operator. In a program, different operators may have different degrees of parallelism.
Data transmission between Stream operators can be in one-to-one(forwarding) mode or redistributing mode, which depends on the types of operators.
One-to-one: Streams (such as between source and map operators) maintain partitions and the order of elements. That means that the number and order of elements seen by the Map Operator’s subtask are the same as the number and order of elements produced by the Source operator’s subtask. Map, Fliter, flatMap and other operators are one-to-one correspondence.
Redistributing: This operation changes the number of data partitions. Each operator subtask sends data to a different target subtask based on the selected Transformation. For example, keyBy() repartitions based on hashCode, broadcast, and rebalance randomly. All of these operators cause a Redistribute process, which is similar to the Shuffle process in Spark.
4.5 the task and operatorchains
For distributed execution, Flink links operator subtasks together to form tasks, and each task executes in a thread. Linking operators to tasks is a very effective optimization: it reduces switching between threads and cache-based data exchange, increasing throughput while reducing latency. The behavior of links can be specified in programming apis.
The following graph shows five subtasks executed in five parallel threads:
4.6 Task Scheduling Process
The client is not part of the runtime and program execution, but it is used to prepare and send dataflow to the Master. Then, the client disconnects or maintains the connection to wait for the calculation results to be received. The client can run in two ways: Either triggered by the program as part of a Java/Scala program, or executed on the command line./bin/flink run.
Five Flink DataStream API
5.1 Flink operation model
The above is the operation model of Flink. The program of Flink is mainly composed of three parts: Source, Transformation and Sink. The DataSource reads data, the Transformation reads data, and the Sink outputs data.
5.2 Flink program architecture
Each Flink program consists of several processes:
-
Get an execution environment; (Execution Environment)
-
Load/create initial data; (Source)
-
Specifies that the data be converted; (Transformation)
-
Specify where to place the calculated results; (Sink)
-
Trigger program execution
5.3 the Environment
The execution environment StreamExecutionEnvironment is the foundation of all Flink program.
There are three ways to create an execution environment:
StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment
Copy the code
StreamExecutionEnvironment.getExecutionEnvironment
Creates an execution environment that represents the context of the currently executing program. If the program is called independently, this method returns the local execution environment; If a program is called from a command line client to commit to a cluster, this method returns the cluster’s execution environment. That is, getExecutionEnvironment determines which execution environment to return based on how the query is run, which is the most common way to create an execution environment.
val env = StreamExecutionEnvironment.getExecutionEnvironment
Copy the code
5.4 the Source
I File-based data source
-
readTextFile(path)
Reads a text file that follows the TextInputFormat specification column by column and returns the result as a String.
object Test { def main(args: Array[String) :Unit = { // 1. Initialize Flink execution environment val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. Read the text file in the specified path val stream: DataStream[String] = executionEnvironment.readTextFile("test00.txt") // 3. The action operator prints data in DataStream stream.print() // 4. Start the Flink application executionEnvironment.execute("test")}}Copy the code
Terminal Print result
1> apache spark hadoop flume 1> kafka hbase hive flink 4> apache spark hadoop flink 5> kafka hbase hive flink 6> sqoop hue oozie zookeeper 8> apache spark hadoop flume 3> kafka hbase oozie zookeeper 2> sqoop hue oozie zookeeper 7> flink oozie azakaban spark Copy the code
Note: stream.print() : The number before each line indicates which parallel thread outputs the line.
You can also read files based on the specified fileInputFormat
readFile(fileInputFormat, path)
-
Socket-based data source
Read information from the Socket
object Test { def main(args: Array[String) :Unit = { // 1. Initialize Flink execution environment val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = executionEnvironment.socketTextStream("localhost".1234) // 3. The action operator prints data in DataStream stream.print() // 4. Start the Flink application executionEnvironment.execute("test")}}Copy the code
-
Collection-based data sources
-
FromCollection (SEQ): Creates a data stream from a collection where all elements are of the same type
val stream: DataStream[Int] = executionEnvironment.fromCollection(List(1.2.3.4)) Copy the code
-
FromCollection (Iterator): Creates a data stream from an Iterator. Classes specifying the element data types are returned by the Iterator
val stream: DataStream[Int] = executionEnvironment.fromCollection(Iterator(3.1.2)) Copy the code
-
FromElements (elements:_*): Creates a data flow from a given sequence of objects, all of which must be of the same type
val list = List(1.2.3) val stream: DataStream[List[Int]] = executionEnvironment.fromElements(list) Copy the code
-
GenerateSequence (from, to): To generate a sequence of numbers in parallel from a given interval
val stream: DataStream[Long] = executionEnvironment.generateSequence(1.10) Copy the code
-
5.5 Sink
Data Sink consumes Data in DataStream and forwards it to a file, socket, external system, or print it out.
Flink has a number of built-in output formats wrapped in DataStream operations.
1. writeAsText
Writes elements line by line (TextOutputFormat) as strings retrieved by calling each element’s toString() method.
2. WriteAsCsv
Writes elements to a file in comma-separated format (CsvOutputFormat) with configurable separations between lines and fields. The value for each field comes from the object’s toString() method.
3. print/printToErr
Prints the value of each element’s toString() method to the standard output or standard error output stream. Alternatively, you can add a prefix to the output stream to help distinguish the different print calls, and if the parallelism is greater than 1, the output will also have a flag identifying which task was produced.
4. writeUsingOutputFormat
Custom file output methods and base classes (FileOutputFormat) that support custom object to byte conversion.
5. writeToSocket
Writes the element to the socket.
5.6 the Transformation
1. map
DataStream → DataStream: Enter a parameter to generate a parameter.
// Initialize the Flink execution environment
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
// Prefixes each line of data with the specified string
val mapDataStream: DataStream[String] = dataStream.map("Apache:" + _)
mapDataStream.print()
// Start the Flink application
executionEnvironment.execute("test")
Copy the code
2. flatMap
DataStream → DataStream: Enter a parameter to generate zero, one, or multiple outputs.
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
// Split each row into a set of Spaces and "flatten" it
val mapDataStream: DataStream[String] = dataStream.flatMap(_.split(""))
mapDataStream.print()
Copy the code
3. filter
DataStream → DataStream: Evaluate the Boolean value of each element and return the element with a Boolean value of true.
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val mapDataStream: DataStream[String] = dataStream.filter(_.contains("kafka"))
Copy the code
4. Connect
DataStream,DataStream → ConnectedStreams: Connect two streams that retain their type. Once connected, the two streams are placed in the same stream, leaving their data and form unchanged.
// Initialize the Flink execution environment
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val listDataStream: DataStream[Int] = executionEnvironment.fromCollection(List(1.2.3))
val connStreams: ConnectedStreams[String.Int] = dataStream.connect(listDataStream)
// The first function in the map function applies to the first DataStream of ConnectedStreams; The second function acts on the second DataStream
connStreams.map(e => println(e + "-- -- -- -- --"), println(_))
// Start the Flink application
executionEnvironment.execute("test")
Copy the code
The test results are as follows:
The map and flatMap operations for ConnectedStreams are called CoMap,CoFlatMap
ConnectedStreams performs the same functions as Map and flatMap. Each Stream in ConnectedStreams is mapped and flatMap respectively.
5. split
DataStream → SplitStream: Split a DataStream into two or more datastreAms based on certain characteristics.
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val flatMapDStream: DataStream[String] = dataStream.flatMap(_.split(""))
val splitDStream: SplitStream[String] = flatMapDStream.split(e => "hadoop".equals(e) match {
case true= >List("hadoop")
case false= >List("other")
})
splitDStream.select("hadoop").print()
Copy the code
Usually used in conjunction with the SELECT operator
6. Union
DataStream → DataStream: Union two or more DataStream to create a new DataStream containing all DataStream elements. Note: If you union a DataStream with itself, in the new DataStream you will see every element appear twice.
val listDStream: DataStream[Int] = executionEnvironment.fromCollection(List(1.2))
val unionDStream: DataStream[Int] = listDStream.union(listDStream)
unionDStream.print()
Copy the code
7. KeyBy
DataStream → KeyedStream: The input must be of type Tuple, which logically divides a stream into disjoint partitions. Each partition contains elements with the same key and is implemented internally as a hash
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val kvDStream: DataStream[(String.Int)] = dataStream.flatMap(_.split("")).map((_, 1))
val result: KeyedStream[(String.Int), String] = kvDStream.keyBy(_._1)
result.print()
Copy the code
It is usually used in combination with aggregation operators such as reduce
8. Reduce,Fold,Aggregations
KeyedStream → DataStream: a grouping DataStream aggregation operation that combines the current element with the result of the last aggregation, producing a new value and returning a stream containing the result of each aggregation, rather than only returning the final result of the last aggregation.
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val kvDStream: DataStream[(String.Int)] = dataStream.flatMap(_.split("")).map((_, 1))
val result: KeyedStream[(String.Int), String] = kvDStream.keyBy(_._1)
val reduceDStream: DataStream[(String.Int)] = result.reduce((iter1, iter2) => (iter1._1, iter1._2 + iter2._2))
reduceDStream.print()
Copy the code
It can be found that Flink does not return the final total statistics like Spark, but returns the results of each aggregation statistics. Therefore,Flink’s Window should be used for data aggregation statistics (fold and aggregation are the same).
In fact, aggregation operators such as Reduce, fold and aggregation are used in conjunction with Window. Only by combining with Window can the desired result be obtained.
fold
KeyedStream → DataStream: a rolling fold operation of a grouped DataStream with an initial value that combines the current element with the result of the previous fold and produces a new value, returning the stream containing the result of each fold instead of only returning the final result of the last fold.
Aggregations
KeyedStream → DataStream: a rolling aggregation operation on a grouped DataStream. The difference between min and minBy is that min returns a minimum value, while minBy returns the element containing the minimum value in its field (the same principle applies to Max and maxBy), and returns the stream containing the result of each aggregation, not just the final result of the last aggregation.
6 Time and Window
6.1 Time
In Flink’s streaming processing, different concepts of time are involved, as shown in the following figure:
Event Time: indicates the Time when an Event is created. It is usually described by the timestamp in the event. For example, in the log data collected, each log records its own generation time, and Flink accesses the event timestamp through the timestamp dispatcher.
Ingestion Time: Indicates the Time when data enters Flink.
Processing Time: the local system Time of each operator that performs time-based operations. It is machine-specific. The default Time attribute is Processing Time.
For example, a log is entered in Flink at the time of 2017-11-12 10:00:00.123 and arrives in Windows at the time of 2017-11-12 10:00:01.234. The log content is as follows:
2017-11-02 18:37:15.624 INFO Fail over to rm2
Copy the code
For services, when is the most meaningful time to collect the number of fault logs within 1 minute? EventTime, because we are counting logs based on when they were generated.
Typically we need to specify which data in the log is eventTime
6.2 the Window
Windows can be divided into two categories:
-
CountWindow: Generates a Window based on the specified number of data items, regardless of time.
-
TimeWindow: Generates Windows based on time.
Timewindows can be divided into three categories based on their implementation principles: rolling Window, Sliding Window and Session Window.
CountWindow can be divided into scroll window and sliding window
1. Tumbling Windows
Slice the data according to the fixed window length.
Features: Time aligned, fixed window length, no overlap.
The scroll window allocator assigns each element to a window of a specified window size, which has a fixed size and does not overlap. For example, if you specify a 5 minute scroll window, the window will be created like this:
Application scenario: Suitable for BI statistics (aggregate calculation for each time period).
2. Sliding Windows
Sliding window is a more generalized form of fixed window. Sliding window consists of fixed window length and sliding interval.
Features: time – aligned, fixed window length, overlap.
The sliding window allocator assigns elements to a fixed-length window. Similar to a scroll window, the size of the window is configured by the window size parameter. Another window slider parameter controls how often the sliding window starts. Therefore, sliding Windows can be overlapped if the sliding parameter is smaller than the window size, in which case elements are distributed among multiple Windows.
For example, if you have a 10-minute window and a 5-minute swipe, then each 5-minute window contains a portion of the data generated in the previous 10 minutes, as shown below:
Application scenario: Collects statistics within the latest period (calculate the failure rate of an interface within the latest 5 minutes to determine whether to report an alarm).
3. Session Windows
It consists of a series of events combined with a timeout interval of a specified length, similar to a Web application session, that is, a new window will be generated if no new data has been received for a period of time.
Features: Unaligned time.
Session window divider for grouping elements through the session activities, the session window compared with rolling window and sliding window, there will be no overlap and fixed start and end time, on the contrary, when it is in a fixed period of time will no longer receive elements, namely the inactive interval, that this window will close. A session window is configured with a session interval, which defines the length of the inactive period. When this inactive period occurs, the current session is closed and subsequent elements are allocated to the new session window.
4. Window API
CountWindow
CountWindow triggers execution based on the number of the same key elements in the window, counting only the result of keys with the number of elements up to the window size. Note: CountWindow’s window_size refers to the number of elements with the same Key, not the total number of elements entered.
-
Rolling window
The default CountWindow is a scrolling window. You only need to specify the window size. When the number of elements reaches the window size, the window is triggered to execute.
def main(args: Array[String) :Unit = { // Initialize the Flink execution environment val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost".1234) val mapDStream: DataStream[(String.Int)] = socketDStream.map(e => { val strings: Array[String] = e.split("") (strings(0), strings(1).toInt) }) val keyDStream: KeyedStream[(String.Int), Tuple] = mapDStream.keyBy(0) // Reduce and print operations are performed only when the number of elements with the same key reaches 3 val windowDStream: WindowedStream[(String.Int), Tuple.GlobalWindow] = keyDStream.countWindow(3) val reduceDStream: DataStream[(String.Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2)) reduceDStream.print() // Start the Flink application executionEnvironment.execute("test")}Copy the code
The test results are as follows:
-
The sliding window
The function names for sliding and scrolling Windows are exactly the same, except that you pass in two parameters, window_size and sliding_size.
The sliding_size in the following code is set to 2, that is, it is evaluated every time two keys of the same size are received, and the window range of each calculation is the first four elements of the key.
def main(args: Array[String) :Unit = { // Initialize the Flink execution environment val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost".1234) val mapDStream: DataStream[(String.Int)] = socketDStream.map(e => { val strings: Array[String] = e.split("") (strings(0), strings(1).toInt) }) val keyDStream: KeyedStream[(String.Int), Tuple] = mapDStream.keyBy(0) // Only when the number of elements of the same key reaches 2 will reduce and print the first 4 items of the key val windowDStream: WindowedStream[(String.Int), Tuple.GlobalWindow] = keyDStream.countWindow(4.2) val reduceDStream: DataStream[(String.Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2)) reduceDStream.print() // Start the Flink application executionEnvironment.execute("test")}}Copy the code
TimeWindow
TimeWindow is a window consisting of all data in a specified time range and computes all data in a window at a time.
-
Rolling window
The default Time window of Flink is divided according to Processing Time, and the data obtained by Flink is divided into different Windows according to the Time when it enters Flink.
// Initialize the Flink execution environment val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost".1234) val mapDStream: DataStream[(String.Int)] = socketDStream.map(e => { val strings: Array[String] = e.split("") (strings(0), strings(1).toInt) }) val keyDStream: KeyedStream[(String.Int), Tuple] = mapDStream.keyBy(0) // Perform reduce and print operations on all data with the same key entering the window every 3 seconds val windowDStream: WindowedStream[(String.Int), Tuple.TimeWindow] = keyDStream.timeWindow(Time.seconds(3)) val reduceDStream: DataStream[(String.Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2)) reduceDStream.print() // Start the Flink application executionEnvironment.execute("test") Copy the code
-
The sliding window
The function names for sliding and scrolling Windows are exactly the same, except that you pass in two parameters, window_size and sliding_size.
The sliding_size in the following code is set to 2s, that is, the window is evaluated every 2s, and the window range evaluated each time is all elements within 4s.
// Initialize the Flink execution environment val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost".1234) val mapDStream: DataStream[(String.Int)] = socketDStream.map(e => { val strings: Array[String] = e.split("") (strings(0), strings(1).toInt) }) val keyDStream: KeyedStream[(String.Int), Tuple] = mapDStream.keyBy(0) // Perform reduce and print operations on all data in the first 4 seconds every 2 seconds val windowDStream: WindowedStream[(String.Int), Tuple.TimeWindow] = keyDStream.timeWindow(Time.seconds(4),Time .seconds(2)) val reduceDStream: DataStream[(String.Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2)) reduceDStream.print() // Start the Flink application executionEnvironment.execute("test") Copy the code
Window Fold
WindowedStream → DataStream: A function that assigns a fold function to a window and returns a fold result.
// Get the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
/ / create SocketSource
val stream = env.socketTextStream("localhost".11111,'\n',3)
// Process the stream and aggregate it by key
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// Introduce a scrolling window
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// Perform the fold operation
val streamFold = streamWindow.fold(100){
(begin, item) =>
begin + item._2
}
// Write aggregated data to a file
streamFold.print()
// Execute the program
env.execute("TumblingWindow")
Copy the code
Aggregation on Window
WindowedStream → DataStream: Aggregate all elements of a window. The difference between min and minBy is that min returns the minimum value, while minBy returns the element containing the smallest field (the same principle applies to Max and maxBy).
// Get the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
/ / create SocketSource
val stream = env.socketTextStream("localhost".11111)
// Process the stream and aggregate it by key
val streamKeyBy = stream.map(item => (item.split("") (0), item.split("") (1))).keyBy(0)
// Introduce a scrolling window
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// Perform the aggregation operation
val streamMax = streamWindow.max(1)
// Write aggregated data to a file
streamMax.print()
// Execute the program
env.execute("TumblingWindow")
Copy the code
Seven EventTime and waterMark
7.1 Introduction of EventTime
In Flink’s streaming process, eventTime is used by most businesses, and ProcessingTime or IngestionTime is usually forced to be used only when eventTime is unavailable.
If you want to use EventTime, you need to introduce the time attribute of EventTime as follows:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Append time to each stream created by env from the time of invocation
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Copy the code
The log time here is the eventTime generated by Flink parsing according to our rules, not the default processingTime
The window time interval is left closed and right open, and the log time of 2019-01-25 00:00:06 will enter the second Window
7.2 Introduction of Watermark
As we know, there is a process and time in the process of stream processing from event generation to flow through source and then to operator. Although in most cases, the data flowing to operator is in the time order of event generation, it cannot be ruled out that out of order is caused by network and other reasons. It means that the sequence of events received by Flink is not strictly in accordance with the Event Time of the events.
Once the order is out of order, if the operation of the window is determined only by eventTime, we cannot be sure whether all the data is in place, but we cannot wait indefinitely. At this time, there must be a mechanism to ensure that the window must be triggered to calculate after a certain time. This special mechanism, Is the Watermark.
Watermark is a mechanism for measuring the progress of an Event Time, and it is a hidden property of the data itself, which carries the corresponding Watermark.
Watermark is used to handle out-of-order events, and properly handle out-of-order events using the Watermark mechanism combined with Windows.
Watermark in the data stream is used to indicate that data with timestamp less than Watermark has already arrived, so the execution of window is also triggered by Watermark.
Watermark can be understood as a delay triggering mechanism. We can set the delay time t, and each time the system will verify the maximum maxEventTime among arrived data, and then determine that all data whose eventTime is less than maxEventtime-t have arrived. If there is a window whose stop time is equal to maxEventTime -t, the window is triggered to execute.
To summarize, calculate the maximum eventTime of all data reaching the window for each entry, subtract the eventTime from the watermark, and if the difference is greater than the end time of a certain window, then the window will perform operator operation
Watermarker for ordered flow is shown below :(Watermark is set to 0)
Watermarker of out-of-order flow is shown in the figure below :(Watermark is set to 2)
When Flink receives each piece of data, it generates a Watermark, which is equal to the maxEventTime – delay duration of all current arrival data, that is, Watermark is carried by data, If the Watermark with data stops later than the current unactivated window, the execution of the corresponding window is triggered. Because Watermark is carried by data, Windows that aren’t activated will never be activated if new data can’t be retrieved during a run.
In the figure above, we set the maximum allowable delay arrival time to 2s, so the Watermark corresponding to the event with the timestamp of 7s was 5s, and the Watermark corresponding to the event with the timestamp of 12s was 10s. If our window 1 was 1s~5s and window 2 was 6s~10s, Then Watermarker of the event with a time stamp of 7s will trigger window 1, and Watermark of the event with a time stamp of 12s will trigger window 2.
7.3 Test Code
// Initialize the Flink execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// Set the Flink time from the default processingTime to eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source: DataStream[String] = env.socketTextStream("localhost".1234)
// Set up watermark and how to parse eventTime in each log data
val stream: DataStream[String] = source.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String] (Time.seconds(0)) {
override def extractTimestamp(element: String) :Long = {
val time: Long = element.split("") (0).toLong
println(time)
time
}
}
)
val keyStream: KeyedStream[(String.Int), Tuple] = stream.map(e => (e.split("") (1), 1)).keyBy(0)
// Set the length of the scroll window to 5 seconds and calculate the eventTime interval every 5 seconds
val windowStream: WindowedStream[(String.Int), Tuple.TimeWindow] = keyStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
val reduceStream: DataStream[(String.Int)] = windowStream.reduce(
(e1, e2) => (e1._1, e1._2 + e2._2)
)
reduceStream.print()
env.execute("test")}Copy the code
Test the following
If watermark is set to 2, the first window will not be computed until 7000(milliseconds) and logs larger than that enter the window
If the window type is set to SlidingEventTimeWindows, watermark affects the calculation time of the sliding window, so you can try it out for yourself
If the window is set to EventTimeSessionWindows. WithGap (Time. Seconds (10)), then influence is the interval between two data must be greater than the specified Time will trigger calculation
Eight summary
Flink is a flow computing engine in the real sense, which perfectly solves the goal of exactly once on the basis of low latency and low fault-tolerant overhead. It is because of the many advantages of Flink that more and more enterprises begin to use Flink as a flow processing framework. The original Storm and Spark technical frameworks have been gradually replaced.