❝
Hello, everyone, I am later, I will share my experience in study and work, I hope one of my articles can be helpful to you, all my articles will be published on the official account, welcome to pay attention to my official account “later X Big Data”, thank you for your support and recognition.
❞
@ directory
- Rolling window
- The sliding window
- The session window
- Summary window knowledge points:
- So how do we get the water line?
- Periodic water level
- Marker water mark
- About parallelism and water line
- Scan the public account “Later X Big Data”, reply to [ebook], and get more PDF [Java and Big Data ebook]
- window
- Time semantic
- Precision disposable
We learned about The wordCount of Flink in the first chapter, and in the API of the second chapter, we just get the data, perform simple conversion, and directly output the data.
However, we used to be event-driven, which means that a piece of data comes in and I process it once, but now the problem is:
We can simply compare wordCount’s requirements to the company’s order amount, which will only increase as orders increase. If operations asks for the following requirements:
- Output the total amount of each 1000 orders
- Output the total amount of orders in the last 5 minutes every 5 minutes
- Output every 3 seconds within the last 5 minutes of the cumulative turnover
- If the interval between two consecutive orders exceeds 30 seconds, divide them into two groups according to this time and output the total amount of the previous group of orders
So what do you think about this demand, because time is always flowing?
Based on these requirements, let’s take a look at Flink’s Windows.
window
Windows: Whether it’s a windowing function in Hive, a windowing function in Spark for batch calculations, or a window here, “essentially partitioning data and then computing the partitioned data.”
So Windows is at the heart of handling infinite streams. Windows divides streams into “buckets” of finite size on which we can apply computing.
In FLink, windowing Flink programs generally fall into two categories,
- Key curtains
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional:"trigger" (else default trigger)
[.evictor(...)] <- optional:"evictor" (else no evictor)
[.allowedLateness(...)] <- optional:"lateness" (else zero) [.sideOutputLateData(...)] <- optional:"output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional:"output tag" Copy the code
- The key curtains
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional:"trigger" (else default trigger)
[.evictor(...)] <- optional:"evictor" (else no evictor)
[.allowedLateness(...)] <- optional:"lateness" (else zero)
[.sideOutputLateData(...)] <- optional:"output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional:"output tag" Copy the code
“The only difference is: keyBy(…) for key flow control. Call window (…). Instead, non-keying calls the windowAll. .“
The window’s life cycle
We said above that the window is to divide the data into different “buckets” and then calculate. Then when does the bucket start and when is it finished? In short, once the first element that should belong to the window arrives, a window is created, and flink will delete the window when the time exceeds the timestamp set by the user.
Let’s understand the types of Windows:
- CountWindow: Generates a Window based on the specified number of data items, regardless of time.
- TimeWindow: Generates Windows based on time. 1. Scroll Window 2. Slide Window 3
It is not difficult to see from the text, CountWindow is based on the number of data generated window, sample code is as follows:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object CountWindowsTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment val wordDS = env.socketTextStream("master102".3456) wordDS .map((_,1)) .keyBy(0) // Accumulate three data items in a single Key for processing .countWindow(3) .sum(1) .print("Test:") env.execute() } } Copy the code
The result is as follows:As can be seen, different words enter different Windows according to keyby, and then output when the number of single key data in the window reaches 3.
Next, we will focus on time Windows. “These Windows end and start according to the time of data”, so this brings us to the second focus of today: time semantics
Time semantic
Flink supports different concepts of time in streaming applications:
- Event Time: Event Time is the Time when each Event occurs on its production device. It is usually described by the timestamp in the event. For example, in the log data collected, each log records its own generation time, and Flink accesses the event timestamp through the timestamp dispatcher.
- Ingestion Time: “Ingestion Time is the Time when data enters Flink”. Ingestion time is conceptually between event time and processing time.
- Processing Time: “Processing Time refers to the system Time of the machine on which the operator is being performed. The default Time attribute is Processing Time.” Processing time is the simplest concept of time and does not provide certainty because it tends to rely on the speed at which data arrives at the system (for example, from message queues) and the speed at which data flows between systems.
We also decide which Time type to use according to business requirements. Generally speaking, Event Time is more used. For example, when we count the total amount of orders in the last 5 minutes, we need the real Time of orders rather than the Time of entering Flink or processing Time.
“In Flink’s streaming process, EventTime is used by most businesses, and ProcessingTime or IngestionTime is usually forced to be used only when EventTime is unavailable. By default, the time semantics processed in the Flink framework are ProcessingTime. If you want to use EventTime, you need to introduce the time attribute of EventTime as follows:“
import org.apache.flink.streaming.api.TimeCharacteristic
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Copy the code
“Note here: If you want to use event times, you must define event times for the data and register watermarks“
Ok, it is a new knowledge point: water line
We’ll have these concepts for now, and then we’ll come back to our window types. Say that finish window type, say water line application in detail again.
For example, two orders are placed in the same order time, one in front of the other, but the network condition of the order placed first is not good, so it reaches the Flink window later, which is also what we often say is out of order, so what should we do? We’ll come back to that later
“Special note: Windows are left closed and right open.“
Rolling window
Scroll Windows have fixed sizes and do not overlap. For example, if you specify a 5-minute scroll window, a new window will start every five minutes, as shown in the figure below.The sample code is as follows:
import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector / * * * @description: ${description} * @author: Liu Jun Jun * @create: 2020-06-29 to pronounce* * / object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val dataDS = env.socketTextStream("bigdata101".3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).keyBy(0) // window size is 5s scroll window //.timewindow (time.seconds (5)) and the following are both acceptable .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(".")}}") } }.print("windows:>>>") env.execute() } } Copy the code
If we run it, we’ll probably find that the timestamp we entered doesn’t work, and the default is indeed processing time:At the same time, it can be seen that the time window of the scrolling window will not overlap, a piece of data will only belong to one window, and the window is left closed and right open.
The sliding window
Sliding Windows are also fixed-length Windows, but due to the sliding frequency, when the sliding frequency is smaller than the window size,“Sliding Windows overlap, in which case an element is assigned to multiple Windows.“For example, slide a 10-minute window for 5 minutes. This gives you a window every five minutes that contains events that have arrived in the last 10 minutes, as shown in the figure below.Next, I post only the change code, and the rest of the code is the same as the scroll code above:
// Scroll for 5 seconds, slide for 3 seconds
/ / window (SlidingProcessingTimeWindows. Of (Time. Seconds (5), Time. Seconds (3))) and the following this sentence is the same
.timeWindow(Time.seconds(5),Time.seconds(3))
Copy the code
Critical is: we found that flink the distribution of the default Windows from starting from 0 per second, for example: the window of the 5 seconds can be divided into: [0 to 5), [5, 10), [10-15),… window for 3 seconds: [0-3), [3, 6), (6-9),…
The session window
Compared to scrolling Windows and sliding Windows,“Session Windows do not overlap and do not have fixed start and end times“. In contrast, the session window closes when no element is received for a certain period of time (that is, when there is a gap of inactivity). Subsequent elements are assigned to the new session window.
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
Copy the code
As you can see, this time the window size is not fixed, so when I test the input and wait a little while after typing, the first window appears, so whenever the processTime interval is more than 10 seconds, the previous window will be output.
Summary window knowledge points:
- All of the above Windows can be changed to EventTime, and the interval can be specified as time.milliseconds (x), time.seconds (x), time.minutes (x). As the event time is specified, the window is changed to the standard event time, and the parameters are changed if the window() method is used.
// Scroll window
// Event time
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// Processing time
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// Slide window // Event time .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // Processing time .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // The session window // Event time .window(EventTimeSessionWindows.withGap(Time.minutes(10))) // Processing time .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) Copy the code
- Flink the distribution of the default Windows from starting from 0 per second, for example: the window of the 5 seconds can be divided into: [0 to 5), [5, 10), [10-15),… window for 3 seconds: [0-3), [3, 6), (6-9),…
Flink has a window offset set. It is not used in general, I simply post here how to use it:
// A 5-second window is offset by 3 seconds
.window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))
Copy the code
As you can see from the figure above, the window has shifted from 80-85 to 83-88. So let me summarize the method again
// Window offset method summary
// Scroll window
// Event time
.window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
// Processing time
.window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3))) // Slide window // Event time .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) // Processing time .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) // The session window // Event time .window(EventTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) // Processing time .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) Copy the code
About the use of Windows basically almost, next as long as say water line
The water WaterMark
WaterMark, what does it do? “Stream handlers that support event time need a way to measure event time progress.” For example, when a window is opened at event time for 1 hour, and the window is currently just over an hour, the window operator who built the hourly window needs to be notified to close the window program in progress.
So the question is, how do you measure when the time is up? So the mechanism used in Flink to measure the time progress of events is the water line.
“Note: Not every piece of data will generate a water line. Watermark is also a piece of data, which is part of stream data. Watermark is a global value, not a value under a particular key, so warmark is increased even if the data is not from the same key.“
At the same time, water line also plays an important role in processing delayed data. We also mentioned in the beginning of the article how to deal with out-of-order data. Then some data are delayed for a few seconds due to network reasons, so we can also “regard water line as the last execution time of the window”.
For example, we set the scrolling window to be 5 seconds, that is [5-10], and we predict that the data may delay 3 seconds in general. Therefore, we hope that the window will continue to wait 3 seconds after the data of 10s arrives to see whether the data originally in [5-10] are merged into this window. When the time is greater than or equal to 13s, the window will trigger the execution of data. This is delayed processing (see the periodic water line below).
So how do we get the water line?
There are two ways to allocate timestamps and generate water marks:
- Directly in the data stream source (I don’t know yet which data source can directly generate timestamps and watermarks, so I won’t discuss it here)
- Through the timestamp dispatcher/water line generator: In Flink, the timestamp dispatcher also defines the water line to be sent note that since the Java era of 1970-01-01t00:00:00 Z, both the timestamp and the water line are specified as milliseconds. (Most usage)
“The use of Event Time must specify a timestamp in the data source. Otherwise, the program cannot know what the event Time is (Processing Time is the only way to use data in the data source without a timestamp).“
So let’s use the second way to generate the water line, pay attention to in“The first operation of the event time“(for example, the first window operation) before specifying the allocator, for example:We found that there are two interfaces for registering water marks:
- AssignerWithPeriodicWatermarks (periodic generation water level)
- AssignerWithPunctuatedWatermarks (markup generated water level)
Say one by one, first say periodically generate water line:
Periodic water level
// Flink defaults to 200ms(ms) to generate a water line, so we can change that too
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } // The unit is milliseconds, so I'm simulating 10s here env.getConfig.setAutoWatermarkInterval(10000) Copy the code
So “the time interval here refers to 10s of system time, not 10s of event time. Don’t confuse this, wait for my test case if you don’t believe me.”
import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector / * * * @description: ${This case is to simulate: event time as standard, window scroll time is 5 seconds} * @author: Liu Jun Jun * @createWhen 2020-06-28:* * / object WaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the event time as the baseline env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // Set parallelism to 1. The parallelism case will be tested later env.setParallelism(1) // set the water level to 10 seconds env.getConfig.setAutoWatermarkInterval(10000) val dataDS = env.socketTextStream("bigdata101".3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks[(String,Long,Int)]{ var maxTs :Long= 0 // To get the water level, call this method periodically to get the water level, which is the delay of 5 seconds override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000) // Is responsible for extracting events override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { maxTs = maxTs.max(element._2 * 1000L) element._2 * 1000L } } /*new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(5)) { override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000 } * / ) val result = tsDS .keyBy(0) // window size is 5s scroll window .timeWindow(Time.seconds(5)) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(".")}}") } } tsDS.print("water") result.print("windows:>>>") env.execute() } } Copy the code
So it can be seen from the result that the window [10-15] is triggered by the data 20. After I input the data 20 and waited for a few seconds, the first window was output. The interval of 10s is the system time, and the watermark = current timestamp – delay time. If the end time of the window <= watermark, the execution of the window will be triggered
The window of [15-20] is triggered by the data of 25, which also conforms to the “window end time <= water mark”.
So what if the data window has been triggered, but some data is still late? So there’s also the concept of allowedLateness (allowing late data to be received) and continuing to put data into the appropriate window. Take a look at the code:
// The rest of the code is the same as in the example above, except that there is an extra line after opening the window
.keyBy(0)
.timeWindow(Time.seconds(5))
// What is the meaning of the 2 seconds
.allowedLateness(Time.seconds(2)
.apply{} Copy the code
AllowedLateness (time.seconds (2)) can be processed as long as the Time is less than the set delay Time after the window is triggered. However, if allowedLateness(time.seconds (2)) is not set, the delay data after the window fires will not be processed.
The delay of data is always unpredictable. What if the time has exceeded the time allowed for receiving delayed data and there is still a little bit of data late, that is, the data I entered 14 after 22 in the figure above? In this case, we cannot delay the wait time of all Windows for a long time just for the occasional bit of data, so there is also the concept of “side output stream”, where late data is placed in the side output stream. Look at the code:
// Just add 3 lines, the rest is the same as before
val outputTag = new OutputTag[(String, Long, Int)]("lateData")/ / new
val result = tsDS
.keyBy(0)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(2)) .sideOutputLateData(outputTag)/ / new .apply {} result.getSideOutput(outputTag).print("side>>>")/ / new Copy the code
Marker water mark
Knowledge is a lot of things to think, so start talking about delay data blah blah blah, and then continue to talk about the mark water line, why is it called a mark? This is because the generation of the water mark is not dependent on time, but on when the mark event is received. By default, all data is a marker event, meaning that each data piece generates a water mark. So when you use this approach, you need to flag certain events.
object WaterMarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataDS = env.socketTextStream("bigdata101".3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).assignTimestampsAndWatermarks( new AssignerWithPunctuatedWatermarks[(String,Long,Int)] { override def checkAndGetNextWatermark(lastElement: (String, Long, Int), extractedTimestamp: Long): Watermark = { if (lastElement._1 .contains("later")) { println("Intermittently generating water levels.....") // Intermittently generate water level data new Watermark(extractedTimestamp) } return null } override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { element._2 * 1000L } } ) val result = tsDS .keyBy(0) .timeWindow(Time.seconds(5)) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(".")}}") } } tsDS.print("water") result.print("calc") env.execute() } } Copy the code
Take a look at my test results:Of course, even if we set markers, a large number of Watermark would still be generated in the scene with high TPS, which would put pressure on downstream operators to some extent.“Therefore, only in scenes with high real-time requirements, the method of Punctuated is selected for Watermark generation.“
About parallelism and water line
If you are careful, you will notice that in all the cases above, I use parallelism of 1, but in actual production, it is definitely not 1. How will this change? Of course there is.
I will start with the conclusion: “If the degree of parallelism is not 1, then the Windows are calculated separately according to their degree of parallelism. Only when the same window is triggered in all degrees of parallelism, then the window will be triggered.”
Mouth said without evidence, we look at the case, this time put the complete code:
import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector / * * * @description: ${how to trigger a window under simulation of multiple degrees of parallelism} * @author: Liu Jun Jun * @createWhen 2020-06-28:* * / object WaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // The parallelism is 1. The default parallelism is the number of CPU cores //env.setParallelism(1) env.getConfig.setAutoWatermarkInterval(10000) val dataDS = env.socketTextStream("bigdata101".3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks[(String,Long,Int)]{ var maxTs :Long= 0 override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000) override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { maxTs = maxTs.max(element._2 * 1000L) element._2 * 1000L } } ) // In this case, allowedLateness and side output streams are removed for simplicity val result = tsDS .keyBy(0) .timeWindow(Time.seconds(5)) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(".")}}") } } tsDS.print("water") result.print("calc") env.execute() } } Copy the code
Take a look at the results:Ok, so that’s it, the basic principles of Windows, time semantics, and water lines. Now that you’ve understood these, look at the four requirements mentioned at the beginning of this article.
So far, we have only windowed data, but how data is processed within a window has not been explained, so the next chapter is on processing functions, and Flink state programming.
Found in the study of the good post: https://www.cnblogs.com/rossiXYZ/p/12286407.html