preface
Flink is a streaming, real-time computing engine
One is streaming, the other is real-time.
Streaming: data is constantly flowing in, that is, data has no boundary, but we have to calculate within a boundary, so there is a question: how to determine the boundary? There are no more than two ways to determine according to the time period or the amount of data. According to the time period, a boundary is divided every time, and according to the amount of data, a boundary is divided according to the number of data. This paper will explain the boundary in detail in Flink.
Real-time: the calculation is performed immediately after the data is sent in, and the result is output. There are two kinds of calculations here:
-
One is to calculate only the data within the boundary, which is easy to understand. For example, if you count the number of news browses by each user in the last five minutes, you can take all the data in the last five minutes and then count the total number of news according to the group of each user.
-
The other method is to associate internal data with external data. For example, you need to calculate the regions of the users who browse news in the last five minutes. In this case, you need to associate the information about the users who browse news in the last five minutes with the region dimension table in hive, and then perform related calculation.
This article is about the content of Flink is around the above concepts for detailed analysis!
The Time and the Window
Time
In Flink, time is an extremely important field if the boundary is defined by time periods.
There are three types of time in Flink, as shown below:
-
Event Time: indicates the Time when an Event is created. It is usually described by the timestamp in the event. For example, in the log data collected, each log records its own generation time, and Flink accesses the event timestamp through the timestamp dispatcher.
-
Ingestion Time: Indicates the Time when data enters Flink.
-
Processing Time: the local system Time of each operator that performs time-based operations. It is machine-specific. The default Time attribute is Processing Time.
For example, a log enters Flink at the time of 2021-01-22 10:00:00.123 and arrives at Window at the time of 2021-01-22 10:00:01.234. The log content is as follows: 2021-01-06 18:37:15.624 INFO Fail over to RM2
For services, when is the most meaningful time to collect the number of fault logs within 1 minute? EventTime, because we are counting logs based on when they were generated.
Window
The Window, the Window, the boundary that we’ve been talking about is this Window right here.
Official explanation: Streaming computing is a data processing engine designed to handle infinite datasets, an ever-growing set of essentially infinite data, whereas Window is a means of cutting up infinite data into finite chunks for processing.
So Window is at the heart of infinite stream processing. It splits an infinite stream into buckets of finite size, on which you can perform calculations.
The Window type
As mentioned at the beginning of this article, there are two ways to divide Windows:
- Time-driven window capture, such as every 1 minute or every 10 minutes.
- Data-driven window based on data, such as every 5 data or every 50 data.
Timewindows can be divided into three categories based on their implementation principles: rolling Window, Sliding Window and Session Window.
- Tumbling Windows
Slice the data according to the fixed window length.
Features: Time aligned, fixed window length, no overlap.
The scroll window allocator assigns each element to a window of a specified window size, which has a fixed size and does not overlap.
For example, if you specify a 5 minute scroll window, the window will be created like this:
Application scenario: Suitable for BI statistics (aggregate calculation for each time period).
- Sliding Windows
Sliding window is a more generalized form of fixed window. Sliding window consists of fixed window length and sliding interval.
Features: time – aligned, fixed window length, overlap.
The sliding window allocator assigns elements to a fixed-length window. Similar to a scroll window, the size of the window is configured by the window size parameter. Another window slider parameter controls how often the sliding window starts. Therefore, sliding Windows can be overlapped if the sliding parameter is smaller than the window size, in which case elements are distributed among multiple Windows.
For example, if you have a 10-minute window and a 5-minute swipe, then each 5-minute window contains the data generated in the last 10 minutes, as shown below:
Application scenario: Collects statistics within the latest period (calculate the failure rate of an interface within the latest 5 minutes to determine whether to report an alarm).
- Session Windows
It consists of a series of events combined with a timeout interval of a specified length, similar to a Web application session, that is, a new window will be generated if no new data has been received for a period of time.
Features: Unaligned time.
Session window divider for grouping elements through the session activities, the session window compared with rolling window and sliding window, there will be no overlap and fixed start and end time, on the contrary, when it is in a fixed period of time will no longer receive elements, namely the inactive interval, that this window will close. A session window is configured with a session interval, which defines the length of the inactive period. When this inactive period occurs, the current session is closed and subsequent elements are allocated to the new session window.
Window API
TimeWindow
TimeWindow is a window consisting of all the data in the specified time range and computes all the data in a window at a time.
Let’s take the number of cars passing at traffic lights as an example:
There will be cars at the traffic lights, and it is impossible to calculate how many cars will pass. Because of the constant flow of traffic, computing knows no boundaries.
So we counted the number of cars passing the red street lamp every 15 seconds, such as 2 cars in the first 15 seconds, 3 cars in the second 15 seconds, and 1 car in the third 15 seconds…
- Cyclin-time-window (unoverlapped data)
We use the NC command in Linux to simulate the sender of the data
1. Enable the port 9999 NC-LK 9999 2. Send content (key represents different intersections and value represents vehicles passing each time) send one line at a time, and the time interval of sending represents the time interval of vehicles passing 9,3, 9,2,7, 4,9,6, 1,5,2, 5,7, 5,4Copy the code
Flink collects data and calculates:
object Window { def main(args: Array[String]): Unit = { //TODO time-window //1. Create a runtime environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / 2. Define the stream source val text = env.sockettextStream ("localhost", 9999) //3. CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4. Perform statistical operations, each sensorId a tumbling window, the window size to 5 seconds / / that is to say, once every 5 seconds statistics, in the past five seconds, the number of cars through each intersection traffic lights. val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(5)) .sum("carCnt") //5. Ds2.print () //6. Env.execute (this.getClass.getName)}}Copy the code
The data we send does not specify a Time field, so Flink uses the default Processing Time, which is the Time it takes the Flink system to process data.
- Sliding time-window (overlapping data)
/ / 1. Create a runtime environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / 2. Define the stream source val text = env.sockettextStream ("localhost", 9999) //3. CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4. Perform statistical operation, each sensorId has a sliding window, the window time is 10 seconds, the sliding time is 5 seconds // that is to say, every 5 seconds, the number of cars passing the traffic lights at each intersection in the past 10 seconds. val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(10), Time.seconds(5)) .sum("carCnt") //5. Ds2.print () //6. Env.execute (this.getclass.getName)Copy the code
CountWindow
CountWindow triggers execution based on the number of the same key elements in the window, counting only the result of keys with the number of elements up to the window size.
Note: CountWindow’s window_size refers to the number of elements with the same Key, not the total number of elements entered.
- Tumbling -count-window (unoverlapped data)
/ / 1. Create a runtime environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / 2. Define the stream source val text = env.sockettextStream ("localhost", 9999) //3. CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4. Perform statistical operations, each sensorId a tumbling window, the window size is 5 / / collected shall be carried out in accordance with the key, the corresponding key the number of occurrences of reach 5 times as a result a val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5) .sum("carCnt") //5. Ds2.print () //6. Env.execute (this.getclass.getName)Copy the code
- Sliding -count-window
It is also a window length and a sliding window operation: the window length is 5 and the sliding length is 3
/ / 1. Create a runtime environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / 2. Define the stream source val text = env.sockettextStream ("localhost", 9999) //3. CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4. To perform statistical operation, each sensorId has a sliding window, the window size is 3 pieces of data, and the window slides to 3 pieces of data // In other words, each intersection is counted separately, and the number of cars passing through each intersection in the last 5 messages is counted when the 3 messages about it are received. Val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5, 3) .sum("carCnt") //5. Ds2.print () //6. Env.execute (this.getclass.getName)Copy the code
- The Window summary
-
Flink supports two ways to divide Windows (time and count)
-
If the window is divided by time, it is a time-window
-
If a window is divided by data, it is a count-window
-
-
Flink supports two important properties of Windows (size and interval)
-
* * * * * * * * * * * * * * * * * * * * * * * * * * * * *
-
So if size>interval, then it’s sliding-window
-
If size<interval, this window will lose data. For example, every five seconds, if you count the number of cars that have passed an intersection in the past three seconds, you will miss two seconds of data.
-
-
Four basic Windows can be obtained by combination
-
* timeWindow(time.seconds (5)) * timeWindow(time.seconds (5))
-
Time-sliding -window Specifies the time window with overlapping data. Example: timeWindow(time.seconds (5), time.seconds (3))
-
The count – tumbling – the number of overlapping Windows without data window, set mode, for example: countWindow (5)
-
Count-sliding -window specifies the number of overlapping data Windows. Example: countWindow(5,3)
-
Window Reduce
WindowedStream → DataStream: A function that assigns a Reduce function to window and returns an aggregated result.
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamWindowReduce { def main(args: Array[String]): Unit = {/ / get the execution environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / create SocketSource val stream = Val streamKeyBy = stream.map(item => (item, TimeWindow (time.seconds (5)) val streamWindow = streamKeyby.timewindow (time.seconds (5) streamWindow.reduce( (item1, item2) => (item1._1, Item1._2 + item2._2)) // Write aggregate data to streamreduce.print () // execute program env.execute("TumblingWindow")}}Copy the code
Window Apply
The Apply method can do some custom processing through methods of anonymous inner classes. Used when there are some complex calculations.
usage
- Implement a WindowFunction class
- Specify that the class’s generics are [input data type, output data type, type using grouped fields in keyBy, window type]
Example: Use the Apply method to implement word statistics
Steps:
- Gets the flow processing runtime environment
- Build the socket flow data source and specify the IP address and port number
- Convert the received data to a word tuple
- Triage (grouping) using keyBy
- Use timeWinodw to specify the window length (calculated every 3 seconds)
- Implement a WindowFunction anonymous inner class
- Aggregate calculations are implemented in the Apply method
- Collect data using collector.collect
The core code is as follows:
/ / 1. Get stream processing environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / 2. Val textDataStream = env.sockettExtStream ("node01", 9999).flatMap(_.split(" ")) //3. Val wordDataStream = textDatastream.map (_->1) //4. Val groupedDataStream: KeyedStream[(String, Int), String] = worddatastream.keyby (_._1) //5. Use timeWinodw to specify the length of the window (calculated every 3 seconds) val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3)) //6. Implement a WindowFunction anonymous inner class Val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, Override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {println("hello world") val tuple = input.reduce((t1, T2) => {(t1._1, T1._2 + T2._2)}) Send back out.collect(tuple)}}) reduceDatStream.print() env.execute()Copy the code
Window Fold
WindowedStream → DataStream: A function that assigns a fold function to a window and returns a fold result.
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamWindowFold { def main(args: Array[String]): Unit = {/ / get the execution environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / create SocketSource val stream = Env.sockettextstream ("node01", 9999,'\n',3) val streamKeyBy = stream.map(item => (item, TimeWindow (time.seconds (5)) val streamWindow = streamKeyby.timewindow (time.seconds (5) streamWindow.fold(100){ (begin, Item) => begin + item._2} // Write aggregate data to streamfold.print () // execute env.execute("TumblingWindow")}}Copy the code
Aggregation on Window
WindowedStream → DataStream: Aggregate all elements of a window. The difference between min and minBy is that min returns the minimum value, while minBy returns the element containing the smallest field (the same principle applies to Max and maxBy).
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ object StreamWindowAggregation { def main(args: Array[String]): Unit = {/ / get the execution environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / create SocketSource val stream = Val streamKeyBy = stream.map(item => (item.split(" ")(0), Item.split (" ")(1)).keyby (0) // Introduce a scroll window val streamWindow = streamKeyby.timewindow (time.seconds (5)) // perform the aggregate operation val StreamMax = streamwindow.max (1) // Write aggregate data to streammax.print () // execute program env.execute("TumblingWindow")}}Copy the code
EventTime with Windows
The introduction of the EventTime
- It is inconsistent with the time in the real world, which is divided into three types in Flink: event time, extraction time and processing time.
- If the time window is defined against EventTime, it forms an EventTimeWindow, requiring that the message itself should carry EventTime
- If the time window is defined based on IngesingtTime, it will form IngestingTimeWindow, based on source systemTime.
- If the time window is defined against the ProcessingTime baseline, it will form the ProcessingTimeWindow, based on operator’s systemTime.
In Flink’s streaming process, eventTime is used by most businesses, and ProcessingTime or IngestionTime is usually forced to be used only when eventTime is unavailable.
If you want to use EventTime, you need to introduce the time attribute of EventTime as follows:
Val env = StreamExecutionEnvironment. GetExecutionEnvironment / / from the moment begin to env to create each stream additional time characteristics env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)Copy the code
Watermark
The introduction of
As we know, there is a process and time in the process of stream processing from event generation to flow through source and then to operator. Although in most cases, the data flowing to operator is in the time order of event generation, it cannot be ruled out that out of order is caused by network, back pressure and other reasons, so called out of order. It means that the sequence of events received by Flink is not in strict accordance with the sequence of Event Time. Therefore, when Flink was designed initially, problems such as network delay and network disorder were taken into account, so an abstract concept, WaterMark, was proposed.
As shown in the figure above, once the order is out of order, if the operation of the Window is determined only according to EventTime, we cannot be sure whether all the data is in place, but we cannot wait indefinitely. At this time, there must be a mechanism to ensure that the Window must be triggered to calculate after a certain time. And that particular mechanism was called Watermark.
Watermark is used to handle out-of-order events, and properly handle out-of-order events using the Watermark mechanism combined with Windows.
Watermark in the data stream is used to indicate that data with timestamp less than Watermark has already arrived, so the execution of Window is also triggered by Watermark.
Watermark can be understood as a delay triggering mechanism. We can set the delay time t of Watermark, and each time the system will check the maximum maxEventTime in the data that has been arrived. It then assumes that all data with EventTime less than maxEventTime -t has arrived, and if there is a window whose stop time is equal to maxEventTime -t, the window is triggered to execute.
Watermarker for ordered flow is shown below :(Watermark is set to 0)
Watermarker of out-of-order flow is shown in the figure below :(Watermark is set to 2)
When Flink receives each piece of data, it generates a Watermark, which is equal to the maxEventTime – delay duration of all current arrival data, that is, Watermark is carried by data, If the Watermark with data stops later than the current unactivated window, the execution of the corresponding window is triggered. Because Watermark is carried by data, Windows that aren’t activated will never be activated if new data can’t be retrieved during a run.
In the figure above, we set the maximum allowable delay to 2s, so the Watermark for the event with the timestamp of 7s is 5s, and the Watermark for the event with the timestamp of 12s is 10s. If our window 1 is 1s 5s and window 2 is 6s 10s, Then Watermarker of the event with a time stamp of 7s will trigger window 1, and Watermark of the event with a time stamp of 12s will trigger window 2.
Flink’s processing of late data
The mechanism of waterMark and Window solved the problem of out-of-order streaming data. For the data that was not in order due to delay, the service could be processed according to eventTime. Flink also had its own solution for the delayed data. Processing delay data is still acceptable within this time range.
Setting the Time allowed for lateness is done with the allowedLateness(Lateness: Time) setting
Delay data is saved by sideOutputLateData(outputTag: outputTag [T])
The delayed data is obtained from datastream.getSideOutput (tag: OutputTag[X])
The specific usage is as follows:
allowedLateness(lateness: Time)
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
javaStream.allowedLateness(lateness)
this
}
Copy the code
This method passes in a Time value that sets the amount of Time that data is allowed to be late, which is different from the Time concept in WaterMark. To recap:
WaterMark= Event time of data – Value of time allowed for unorderliness
As new data arrives, the waterMark value is updated to the latest data event time – out-of-order time values are allowed, but the waterMark value is not updated if historical data arrives at this time. Basically, waterMark is designed to receive as much out-of-order data as possible.
Then, the Time value here is mainly to wait for the late data. Within a certain Time range, if the data belonging to this window arrives, the calculation will still be carried out, and the calculation method will be explained in detail later
Note: This method only applies to event-based Windows. If it is processing-based and a non-zero time value is specified, an exception will be thrown.
sideOutputLateData(outputTag: OutputTag[T])
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
javaStream.sideOutputLateData(outputTag)
this
}
Copy the code
This method saves the late data to the given outputTag parameter, which is an object used to mark the late data.
DataStream.getSideOutput(tag: OutputTag[X])
DataStream returned by operations such as Window calls this method, passing in an object marked with delayed data to fetch the delayed data.
Understanding of delayed data
Delayed data refers to:
After the current Window (assuming the Window range is 10-15) has been calculated, another data belonging to the Window (assuming the event time is 13) will still trigger the Window operation, which is called delayed data.
So the question is, how do you calculate the latency?
Assuming a Window range of 10-15 and a delay of 2s, the Window action can be triggered as long as WaterMark<15+2 and belongs to the Window. WaterMark>=15+2, 10-15 will no longer trigger the Window operation, even if the Event Time of the new data is within the Window Time.
Flink associates the Hive partition table
Flink 1.12 supports the function of using the latest Hive partition as temporal table. It can directly associate the latest Hive partition table with the latest Hive partition table in SQL mode. Flink 1.12 automatically monitors the latest Hive partition, and when the new partition is monitored, it will automatically replace the full dimension table data. In this way, Kafka streams can be associated with the latest Hive partitions in real time to achieve data width without writing DataStream programs.
Specific usage:
Register HiveCatalog in Sql Client
vim conf/sql-client-defaults.yaml catalogs: - name: hive_catalog type: hive hive-conf-dir: /disk0/soft/hive-conf/ # This directory requires the hive-site. XML fileCopy the code
Create Kafka table
CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 ( master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>, WITH ('connector' = 'kafka', 'topic' = 'topic_name', 'format' = 'json', 'properties.bootstrap.servers' = 'host:9092', 'properties.group.id' = 'flinkTestGroup', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '1607844694000' );Copy the code
Flink Fact table associates with the latest Hive partition data
Dim_extend_shop_info is an existing table in Hive, so we use the table hint to dynamically enable dimension table parameters.
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as
SELECT * FROM
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,
ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn
from hive_catalog.flink_db.kfk_fact_bill_master_12 t1
JOIN hive_catalog.flink_db.dim_extend_shop_info
/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '1 h',
'streaming-source.partition-order' = 'partition-name') */
FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id
where groupID in (202042)) t where t.rn = 1
Copy the code
Parameter Description:
-
Streaming-source. enable Enables streaming reading of Hive data.
-
Streaming-source-partition. Include has the following two values:
- Latest property: Only the latest partition data is read.
- “All” : reads all partition data. The default value is “all”, indicating that all partitions are read. “latest” can only be used in temporal Join to read the latest partition as a dimension table, but cannot read the latest partition data directly.
-
Streaming-source. monitor-interval Specifies the time for monitoring the generation of new partitions. The minimum time is 1 hour, because the current implementation is that each task queries Metastore, and the high-frequency query may cause excessive pressure on Metastore. It should be noted that 1.12.1 liberates this restriction, but it is still recommended not to set an interval too short for actual services.
-
Streaming-source-partition -order: there are three types of partitioning policies. The most recommended one is partition-name:
- Partition-name Uses the default partition name to load the latest partitions in sequence
- Create-time Specifies the time sequence for creating partition files
- Partition-time Specifies the time sequence of using partitions