Flink tutorials are constantly updated on my website at lulaoshi.info/flink/.
We often need to aggregate data in a time window dimension, which is a common problem to be solved in stream processing applications. Flink window operator provides us with a convenient API, we can cut the data stream into one window and process the data in the window. This article will introduce how to calculate Windows on Flink.
The rough skeleton structure of a Flink window application is shown below:
// Keyed Windowstream .keyBy(...) <- according to oneKeyGroup.window(...) <- Assigns elements from the data stream to the appropriate window [.trigger(...)] <- Specifies the triggerTrigger(Optional) [.evictor(...)] <- Specifies a cleanerEvictor(optional). Reduce/aggregate/process () < - window processing functionWindow Function
// Non-Keyed Windowstream .windowAll(...) <- Without grouping, all elements in the data stream are assigned to the corresponding window [.trigger(...)] <- Specifies the triggerTrigger(Optional) [.evictor(...)] <- Specifies a cleanerEvictor(optional). Reduce/aggregate/process () < - window processing functionWindow Function
Copy the code
First, we need to decide whether to group a DataStream by Key, which must be done before the window is evaluated. Data flows through keyBy will form multiple groups of data, and multiple instances of downstream operators can be evaluated in parallel. The windowAll does not group data streams; all data is sent to a single instance of the downstream operator. After deciding whether to group or not, the subsequent operations of Windows are basically the same. The content involved in this paper is mainly for the Keyed Window that passes through keyBy. The operator that passes through windowAll is non-keyed Window. Their principle and operation are similar to that of Keyed Window, except that all data is sent to a single instance downstream, or the parallelism of the downstream operator is 1.
There are two necessary operations in the skeleton structure of the Flink window:
- A WindowAssigner is used to assign elements in a data stream to the corresponding Windows.
- When the Window trigger condition is met, the Window Function is used to process the data in the Window
reduce
,aggregate
,process
.
Other triggers and evictor are additional options for window triggering and destruction. They are intended for advanced programmers who need more customization. If they are not set, the default Settings will be used.
The diagram above shows the life cycle of the window. Suppose we set up a 10-minute rolling window, the start time of the first window is 0:00, the end time is 0:10, and so on. As the elements of the data flow come in, the window allocator allocates them to the corresponding window based on the Time (Event Time or Processing Time). If the corresponding Window meets the trigger condition, such as the end time of the Window, the corresponding Window Function will be triggered for calculation. Note that this diagram is only a rough sketch, and different Window functions are handled slightly differently.
In terms of data type, a DataStream is converted to KeyedStream by keyBy, and then converted to WindowedStream by window. Window functions such as Reduce, aggregate or process are performed on the DataStream. Perform necessary aggregation operations on the data.
WindowAssigner
There are two types of Windows, one is time-based Window and the other is count-based Window. This paper mainly discusses time-based Window, in Flink source code, expressed by TimeWindow. Each TimeWindow has a start time and an end time, representing a closed and open period. Flink provides some built-in Windows Assigners, namely scroll, slide, and session Windows, and I’ll explain how to use them next.
A count-based Window manages the Window according to the order in which events arrive at the Window. The order of arrival Window is not consistent with Event Time, so the result of a count-based Window is uncertain.
Rolling window
There is no overlap between Windows under the scrolling window, and the window length is fixed. We can use TumblingEventTimeWindows and TumblingProcessingTimeWindows create a based on the Event Time or Processing Time rolling Time window. The length of the window can use org. Apache. Flink. Streaming. API. Windowing. Time. The time in seconds, minutes, hours and days to set up.
The following code shows how to use scroll Windows. In the last example in the code, we set the offset on a fixed length basis. By default, time Windows are aligned, for example, a one-hour window whose start and end times are 0:00:00.000-0:59:59.999. If offset is set, the window’s start and end times are 0:15:00.000-1:14:59.999. The offset can be used for different Time zones around the world. If the system Time is based on THE Greenwich Mean Time (UTC-0), the local Time in China must be set to time.hours (-8).
val input: DataStream[T] =...// tumbling event-time windows
input
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)
// tumbling processing-time windows
input
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)
// 1 hour tumbling event-time windows offset by 15 minutes.
input
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)
Copy the code
Some code uses timeWindow instead of window, for example, input.keyby (…). TimeWindow (Time. Seconds (1)). TimeWindow is a shorthand. When we in the execution environment setting TimeCharacteristic. EventTime, Flink corresponding call TumblingEventTimeWindows; If we based on TimeCharacteristic ProcessingTime, Flink TumblingProcessingTimeWindows use.
The sliding window
A sliding window slides forward continuously in a single step. The length of the window is fixed. When used, we set Slide and Size. The size of a Slide determines how often Flink creates new Windows. A smaller Slide can have a large number of Windows. When Slide is smaller than the Size of the window, adjacent Windows overlap and an event is assigned to multiple Windows. Slide is larger than Size and some events may be dropped.
As described earlier, we use units of Time in the Time class to define Slide and Size, and offset can also be set. Similarly, timeWindow is an abbreviation that selects the appropriate method to initialize the window based on the time semantics set in the execution environment.
val input: DataStream[T] =...// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
// sliding processing-time windowsinput .keyBy(<... >) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
// sliding processing-time windows offset by -8 hoursinput .keyBy(<... >) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(- 8 -)))
.<window function>(...)
Copy the code
The session window
The Session window splits different Windows according to the Session gap. When a window does not receive new data within a period longer than the Session gap, the window closes. In this mode, the window length is variable, and the start and end times of each window are not fixed. We can set up fixed Session of gap, also can use SessionWindowTimeGapExtractor dynamically determine the Session the length of the gap.
The following code shows how to use Session of fixed length and variable gap to establish Session window, including SessionWindowTimeGapExtractor [T] generic T for the type of data flow, we can according to the elements in the data stream to generate the Session gap.
val input: DataStream[T] =...// event-time session windows with static gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// event-time session windows with dynamic gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T) :Long = {
// determine and return session gap
}
}))
.<window function>(...)
// processing-time session windows with static gap
input
.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// processing-time session windows with dynamic gap
input
.keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T) :Long = {
// determine and return session gap
}
}))
.<window function>(...)
Copy the code
The window function
The data has been allocated to different Windows by window and WindowAssigner. Next, we use window functions to process the data in each window. Window functions are mainly divided into two types, one is incremental calculation, such as reduce and aggregate, and the other is full calculation, such as Process. Incremental calculation means that the window saves a piece of intermediate data. Each time a new element flows into the window, the new element is combined with the intermediate data to generate new intermediate data and save the data in the window. Full calculation means that the window caches all elements of the window and performs calculation on all elements in the window after triggering conditions.
ReduceFunction
When using the Reduce operator, we need to rewrite a ReduceFunction. ReduceFunction, which was introduced in DataStream API: keyBy, Reduce, and Aggregations, takes two inputs of the same type and generates one output, that is, two summed operations in one to generate a new element of the same type. On the window to reduce the principle is similar, but more than a window state data, the status of the data types and data input data types is consistent, is before the middle of the two calculation results data. When new elements in the data stream flow in, ReduceFunction combines the intermediate results and the new incoming data to generate new data to replace the previous state data.
case class StockPrice(symbol: String, price: Double)
val input: DataStream[StockPrice] =... senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
The return type of reduce must be the same as the input type StockPrice
val sum = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.reduce((s1, s2) => StockPrice(s1.symbol, s1.price + s2.price))
Copy the code
The above code operates on the two tuples using Lambda expressions. Since we keyBy the Symbol field, the data of the same symbol is grouped together. We then add price and return the result of the StockPrice type as well, otherwise an error will be reported.
The advantage of using Reduce is that the amount of window state data is very small, and it is relatively simple to realize a ReduceFunction. Lambda expressions can be used, or functions can be rewritten. The disadvantage is that the functions that can be realized are very limited, because the data type, input type and output type of the intermediate state data must be consistent, and only one intermediate state data is saved. When we want to operate on the data in the whole window, only one intermediate state data is far from enough.
AggregateFunction
AggregateFunction is also an incremental computation window function that also holds only intermediate state data, but AggregateFunction is more complex to use. Let’s take a look at its source definition:
public interface AggregateFunction<IN.ACC.OUT> extends Function.Serializable {
An Accumulator is an Accumulator. An Accumulator is an intermediate state data (ACC)
// This function is usually called during initialization
ACC createAccumulator(a);
// When a new element comes in, merge the new element with the state data ACC and return the state data ACC
ACC add(IN value, ACC accumulator);
// Merge the two ACC's
ACC merge(ACC a, ACC b);
// Convert intermediate data to result data
OUT getResult(ACC accumulator);
}
Copy the code
The input type is IN, the output type is OUT, and the intermediate state data is ACC. Such a complex design is mainly to solve the problem of inconsistent input type, intermediate state and output type. At the same time, ACC can be customized, we can build the data structure we want IN ACC. For example, if we want to calculate the average value of a field in a window, ACC will store the sum and the number. The following is an example of the average value:
case class StockPrice(symbol: String, price: Double)
// IN: StockPrice
// ACC :(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String.Double.Int), (String.Double)] {
override def createAccumulator() = ("".0.0)
override def add(item: StockPrice, accumulator: (String.Double.Int)) =
(item.symbol, accumulator._2 + item.price, accumulator._3 + 1)
override def getResult(accumulator:(String.Double.Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)
override def merge(a: (String.Double.Int), b: (String.Double.Int)) =
(a._1 ,a._2 + b._2, a._3 + b._3)
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] =...val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.aggregate(new AverageAggregate)
Copy the code
The workflow of these functions is shown below. A new ACC should be created before calculation. At this time, ACC has no actual meaning. When new data flows in, Flink will call add method to update ACC and return the latest ACC, which is an intermediate state data. When there are some ACC fuses across nodes, Flink calls Merge to generate a new ACC. When all accs are finally fused into one ACC, Flink calls getResult to generate the result.
ProcessWindowFunction
Unlike the first two methods, ProcessWindowFunction caches the full amount of data in the window. Of all the Flink apis, the Process operator and its corresponding functions are the lowest level implementations that allow access to more low-level data, such as direct manipulation of state. It is defined in the source code as follows:
/** * IN Input type * OUT output type * KEY keyBy groups by KEY, KEY type * W window type */
public abstract class ProcessWindowFunction<IN.OUT.KEY.W extends Window> extends AbstractRichFunction {
/** * The elements IN a window are cached IN Iterable
and output to Collector
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/** * Delete all state data when the window is cleared. * /
public void clear(Context context) throws Exception {}
/** * The context of a window, including some metadata of the window, state data, etc. * /
public abstract class Context implements java.io.Serializable {
// Returns the Window currently being processed
public abstract W window(a);
// Returns the current Process Time
public abstract long currentProcessingTime(a);
// Return the Watermark corresponding to the current Event Time
public abstract long currentWatermark(a);
// Returns the state of a Window under a Key
public abstract KeyedStateStore windowState(a);
// Returns the global status of a Key
public abstract KeyedStateStore globalState(a);
// Late data is sent to another location
public abstract <X> void output(OutputTag<X> outputTag, X value); }}Copy the code
When used, Flink caches all elements of a window under a Key IN Iterable
, which we need to process and then collect the output with Collector
. We can use Context to get more information about the window, including time, status, location of late data, and so on.
The following code is a simple application of ProcessWindowFunction, where we count the number of price occurrences and output the one with the most occurrences.
case class StockPrice(symbol: String, price: Double)
class FrequencyProcessFunction extends ProcessWindowFunction[StockPrice, (String.Double), String.TimeWindow] {
override def process(key: String, context: Context, elements: 可迭代[StockPrice], out: Collector[(String.Double)]) :Unit = {
// The stock price and the number of times that price appears
var countMap = scala.collection.mutable.Map[Double.Int] ()for(element <- elements) {
val count = countMap.getOrElse(element.price, 0)
countMap(element.price) = count + 1
}
// Sort by the number of occurrences
val sortedMap = countMap.toSeq.sortWith(_._2 > _._2)
// Select the highest occurrence output to Collector
if (sortedMap.size > 0) {
out.collect((key, sortedMap(0)._1))
}
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] =...val frequency = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.process(new FrequencyProcessFunction)
Copy the code
Context has two states. One is the global state for the Key, which spans multiple Windows and can be accessed by multiple Windows. The other is the state of the order window with this Key. The state of the single window only saves the data of the window, mainly for the scenario where the process function is called for many times, such as processing late data or user-defined Trigger and other scenarios. When using the state of a single window, clear the state in the clear function.
Compared with AggregateFunction and ReduceFunction, ProcessWindowFunction has wider application scenarios and can solve more complex problems. However, ProcessWindowFunction needs to store all the elements of the window as state, which will consume a lot of storage resources, especially in the case of large data and many Windows, and the whole application may crash if used incorrectly. For example, if the daily data is in TB level, we need the Slide to be a 10-minute sliding window with a Size of one hour. This setting will result in a large number of Windows, and many copies of an element will be assigned to each of the Windows, which will bring huge memory pressure.
ProcessWindowFunction is combined with incremental computation
When we want to access the metadata in the window without caching all the data in the window, we can combine ProcessWindowFunction with the incremental computation functions Reduce and Aggregate. For a window, Flink increments the calculation and sends the incremental calculation results to ProcessWindowFunction as input before the window closes.
In the following code, the Lambda function handles the maximum and minimum values of everything. This step is an incremental calculation. The computed results are passed as data types (String, Double, Double) to the WindowEndProcessFunction, which simply adds the window end timestamp to the result, MaxMinPrice.
case class StockPrice(symbol: String, price: Double)
case class MaxMinPrice(symbol: String, max: Double, min: Double, windowEndTs: Long)
class WindowEndProcessFunction extends ProcessWindowFunction[(String.Double.Double), MaxMinPrice.String.TimeWindow] {
override def process(key: String,
context: Context,
elements: 可迭代[(String.Double.Double)],
out: Collector[MaxMinPrice) :Unit = {
val maxMinItem = elements.head
val windowEndTs = context.window.getEnd
out.collect(MaxMinPrice(key, maxMinItem._2, maxMinItem._3, windowEndTs))
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] =...// The return type of reduce must be the same as the input type
// To do this we split StockPrice into a triple (ticker, Max, min)
val maxMin = input
.map(s => (s.symbol, s.price, s.price))
.keyBy(s => s._1)
.timeWindow(Time.seconds(10))
.reduce(
((s1: (String.Double.Double), s2: (String.Double.Double)) => (s1._1, Math.max(s1._2, s2._2), Math.min(s1._3, s2._3))),
new WindowEndProcessFunction
)
Copy the code
Trigger
A Trigger determines when a Window Function is started to process the data in the Window and when the data in the Window is cleaned. The incremental calculation window function directly aggregates each new incoming data. Trigger determines to send the aggregation result at the end of the window. The full evaluation window function needs to cache the elements within the window. Trigger determines that all elements are evaluated at the end of the window and the results are sent. Each window has a default Trigger. For example, the above examples are based on Processing Time window. When the end Time of the window is reached, Trigger and corresponding calculation will be triggered. We can customize a Trigger if we have some personalized Trigger conditions, such as when certain elements are encountered in a window, when the total number of elements reaches a certain number, or when the elements in the window arrive in a certain pattern. We can even define some pre-calculated logic in Trigger, for example in Event Time semantics, we can define pre-calculated output logic even though Watermark has not yet arrived, so that we can get calculation results quickly and with lower latency.
Let’s first look at what Trigger returns. Trigger returns a result called TriggerResult when a condition is met:
- CONTINUE: Do nothing.
- FIRE: Starts calculations and sends results downstream without cleaning up window data.
- PURGE: PURGE window data but do not perform calculations.
- FIRE_AND_PURGE: Start the calculation, send the results, and clean up the window data.
Before we move on to the use of Trigger, let’s look at the use of timers. We can think of Timer as an alarm clock. We register a future time before using it. When the time arrives, just like the alarm clock will ring, the program will enable a callback function to perform a certain time-related task. For the custom Trigger, we need to consider the logic of the registration time. When this time is reached, Flink will start the Window Function and clean up the Window data.
WindowAssigner has a default Trigger. For example, an Event Time-based window would have an EventTimeTrigger that would send FIRE whenever the window’s Watermark timestamp reached the end Time of the window. In addition, ProcessingTimeTrigger corresponds to the ProcessingTime window and CountTrigger corresponds to the count-based window.
When these existing triggers do not meet our needs, we need to customize the Trigger, next let’s look at the source of Flink Trigger.
/** ** T for element type * W for window */
public abstract class Trigger<T.W extends Window> implements Serializable {
/** * The onElement method is called when an element is added to a window and returns a TriggerResult */
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/** * The onProcessTime method is called when FIRE is triggered by a Processing time-based Timer
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/** * Call onEventTime when FIRE is triggered by an Event time-based Timer */
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/** * Returns true */ if this Trigger supports state merging
public boolean canMerge(a) {
return false;
}
/** * onMerge is called when multiple Windows are merged
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/** * The clear method is called to clear all Trigger state data */ when the window data is cleared
public abstract void clear(W window, TriggerContext ctx) throws Exception
/** * context, which holds the time, status, monitoring and timer */
public interface TriggerContext {
/** * Returns the current Processing Time */
long getCurrentProcessingTime(a);
/** * returns MetricGroup */
MetricGroup getMetricGroup(a);
/** * Returns the current Watermark time */
long getCurrentWatermark(a);
/** * Registers a time as a Timer, and the onProcessingTime method is called when the system time reaches the time point */
void registerProcessingTimeTimer(long time);
/** * Register a time as a Timer, and the onEventTime method is called */ when Watermark time reaches time
void registerEventTimeTimer(long time);
/** * Delete the registered Timer */
void deleteProcessingTimeTimer(long time);
/** * Delete the registered Timer */
void deleteEventTimeTimer(long time);
/** * Get the state of the window Trigger */
<S extends State> S getPartitionedState(StateDescriptor
stateDescriptor)
,>;
}
/** * Merge multiple window Trigger states */
public interface OnMergeContext extends TriggerContext { <S extends MergingState<? ,? >>void mergePartitionedState(StateDescriptor
stateDescriptor)
,>; }}Copy the code
Let’s use a pre-calculated example to explain how to use a custom Trigger. The default Window length is 60 seconds. If the price falls by more than 5%, the Window Function is executed immediately. If the price falls between 1% and 5%, the Window Function is triggered 10 seconds later.
class MyTrigger extends Trigger[StockPrice.TimeWindow] {
override def onElement(element: StockPrice,
time: Long,
window: TimeWindow,
triggerContext: Trigger.TriggerContext) :TriggerResult = {
val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double] ("lastPriceState", classOf[Double]))
// The Settings return the default value CONTINUE
var triggerResult: TriggerResult = TriggerResult.CONTINUE
// The first time you use lastPriceState, the state is empty and needs to be judged first
// The status data is generated by the Java side. If it is empty, a NULL is returned
// If you use Scala's Double directly, you need to use the following method to determine whether it is null
if (Option(lastPriceState.value()).isDefined) {
if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) {
// If the price drops more than 5%, just FIRE_AND_PURGE
triggerResult = TriggerResult.FIRE_AND_PURGE
} else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) {
val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000))
// Register a Timer after 10 seconds
triggerContext.registerProcessingTimeTimer(t)
}
}
lastPriceState.update(element.price)
triggerResult
}
// Instead of EventTime, we return a CONTINUE
override def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext) :TriggerResult = {
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext) :TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext) :Unit = {
val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double] ("lastPrice", classOf[Double]))
lastPrice.clear()
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] =...val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(60))
.trigger(new MyTrigger)
.aggregate(new AverageAggregate)
Copy the code
When customizing Trigger, if state is used, it is necessary to use the clear method to clear state data, otherwise the state data will accumulate as more and more Windows are created.
Evictor
Evictor is an optional option based on Windows Assigner and Trigger to clear some data. We can call Evictor before or after the Window Function executes.
/** ** T for element type * W for window */
public interface Evictor<T.W extends Window> extends Serializable {
/** * call */ before Window Function
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/** * call */ after Window Function
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Evictor的上下文
*/
interface EvictorContext {
long getCurrentProcessingTime(a);
MetricGroup getMetricGroup(a);
long getCurrentWatermark(a); }}Copy the code
EvictBefore and evictAfter are called before and after the Window Function, respectively, and all the elements of the Window are placed in Iterable
>, which we need to implement our own cleanup logic. Of course, Evictor is not necessary for the ReduceFunction and AggregateFunction of incremental computation.
Flink offers several good Evictor implementations:
CountEvictor
Keep a certain number of elements, and clean the excess elements from front to back.TimeEvictor
The elements of a time period are reserved, and the elements prior to that time period are cleared.