Flink window background
Flink considers Batch to be a special case of Streaming, so the Flink underlying engine is a Streaming engine on which Streaming and Batch processing are implemented. Window is the bridge from Streaming to Batch. In layman’s terms, a Window is a mechanism for setting up a finite set of streams to operate on a bounded data set. Collections on streams are scoped by Windows, such as “calculate the last 10 minutes” or “sum of the last 50 elements.” A Window can be driven by a Time Window (such as every 30s) or a data Count Window (such as every 100 elements). The DataStream API provides Windows for Time and Count.
The rough skeleton structure of a Flink window application is shown below:
// Keyed Windowstream .keyBy(...) <- Group by Key. Window (...) <- Assigns elements from the data stream to the appropriate window [.trigger(...)] <- Specify Trigger Trigger (optional) [.evictor(...)] < - specify the cleaners Evictor (optional). Reduce/aggregate/process () < Function Window - the Window procedure Function// Non-Keyed Windowstream .windowAll(...) <- Without grouping, all elements in the data stream are assigned to the corresponding window [.trigger(...)] <- Specify Trigger Trigger (optional) [.evictor(...)] < - specify the cleaners Evictor (optional). Reduce/aggregate/process () < Function Window - the Window procedure FunctionCopy the code
There are two necessary operations in the skeleton structure of the Flink window:
- A WindowAssigner is used to assign elements in a data stream to the corresponding Windows.
- When the Window trigger condition is met, the Window Function is used to process the data in the Window
reduce
,aggregate
,process
Rolling window
Time driven
The data is segmented according to the fixed window length. There is no overlap between Windows under the scrolling window, and the window length is fixed. We can use TumblingEventTimeWindows and TumblingProcessingTimeWindows create a based on the Event Time or Processing Time rolling Time window. The length of the window can use org. Apache. Flink. Streaming. API. Windowing. Time. The time in seconds, minutes, hours and days to set up.
// Critical handling cases
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
// Based on time driven, a window is divided every 10 seconds
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow =
keyedStream.timeWindow(Time.seconds(10));
// Based on event-driven, every three events (i.e. three data with the same key) separated by a window for calculation
// WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow =
keyedStream.countWindow(3);
// Apply is the window application function, that is, the functions in apply will be applied to the window data.
timeWindow.apply(new MyTimeWindowFunction()).print();
// countWindow.apply(new MyCountWindowFunction()).print();
Copy the code
Event-based
When we want purchases per 100 users to be driven, we calculate the window every time it fills up with 100 “same” elements, which makes sense. Here is an implementation example
public class MyCountWindowFunction implements WindowFunction<Tuple2<String.Integer>,
String.Tuple.GlobalWindow> {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable
> input, Collector
out)
throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input){
sum += tuple2.f1;
}
// Useless timestamp. The default value is: long. MAX_VALUE, because time is not cared for in event counting cases.
long maxTimestamp = window.maxTimestamp();
out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :"+ maxTimestamp + ","+ format.format(maxTimestamp) ); }}Copy the code
Sliding time window
Moving window is a more generalized form of fixed window. Sliding window is composed of fixed window length and sliding interval. Its characteristics are as follows: window length is fixed and there can be overlap. When used, we set Slide and Size. The size of a Slide determines how often Flink creates new Windows. A smaller Slide can have a large number of Windows. When Slide is smaller than the Size of the window, adjacent Windows overlap and an event is assigned to multiple Windows. Slide is larger than Size and some events may be dropped
Time-based scrolling window
// Based on the time drive, calculate the data of the last 10 seconds every 5s
// WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow =
keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {
@Override
public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
Iterator<Tuple3<String, String, String>> iterator = input.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
Tuple3<String, String, String> next = iterator.next();
sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);
}
// window.out.collect(sb.toString()); }});Copy the code
Event-based scrolling window
/** * Sliding Windows: Windows can overlap * 1, time-based * 2, event-based */
WindowedStream<Tuple3<String, String, String>, String, GlobalWindow> countWindow = keybyed.countWindow(3.2);
SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {
@Override
public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
Iterator<Tuple3<String, String, String>> iterator = input.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
Tuple3<String, String, String> next = iterator.next();
sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);
}
// window.out.collect(sb.toString()); }});Copy the code
Session time window
It consists of a series of events combined with a timeout interval of a specified length, similar to the Session of a Web application, that is, a new window will be generated if no new data is received for a period of time. In this mode, the length of the window is variable, and the start and end time of each window are not determined. We can set up fixed Session of gap, also can use SessionWindowTimeGapExtractor dynamically determine the Session the length of the gap.
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// event-time session windows with dynamic gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
// determine and return session gap
}
}))
.<window function>(...)
// processing-time session windows with static gap
input
.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// processing-time session windows with dynamic gap
input
.keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
// determine and return session gap
}
}))
.<window function>(...)
Copy the code
The window function
Incremental calculation corresponds to Reduce and aggregate, and full calculation corresponds to Process. Incremental calculation means that the window saves a piece of intermediate data, and every time a new element flows into the window, the new element and intermediate data are combined in two to generate new intermediate data. Save it to a window. Full calculation means that the window caches all elements of the window and performs calculation on all elements in the window after triggering conditions
reference
Cloud.tencent.com/developer/a…
Check your profile for more.