Windows is at the heart of handling wireless data streams by dividing them into limited-size buckets on which calculations are performed.

The structure of windowed Flink programs is usually as follows: Keyed Streams and non-Keyed Streams. The difference between the two is that keyBy(…) is called in the packet flow. Method used in non-grouping streams using windowAll(…) Replace window(…) in packet flow Methods.

Window lifecycle

When the first element belonging to a window arrives, the window is created, and when the event or processing time has passed its end timestamp and user-specified allowed delay, the window is completely removed. At the same time, Flink ensures that deletion is performed only for time-based Windows and not for other types (for example, Global Windows). For example, the event-based window policy creates a non-overlapping window every 5 minutes, allowing a 1-minute delay, so when the first element of the 12-12:05 timestamp arrives, Flink will create a new window for it until the watermark reaches the 12:06 timestamp. Flink deletes the window.

In Flink, each window has a Trigger and function (ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction) associated with it. The function contains the calculation logic applied to the elements in the window, and the trigger is used to specify the conditions under which the function calculation of the window is performed. Triggering policies are typically similar to “when there are more than four elements in the window,” or “when watermark reaches the window end timestamp.” Triggers can also decide to clear the contents of a window at any time during its life cycle. The cleanup operation in this case involves only the elements in the window, not the window metadata. That is, new elements can still be added to the window.

In addition, you can specify a collector (Evictor) that removes elements from the window after the trigger is fired and before or after the function is used.

Grouped Windows and ungrouped Windows

Before defining the window, the first thing we need to know is whether our data flow needs to be grouped. Using keyBy (…). The wireless stream is separated into logically grouped streams, and otherwise the stream data is not grouped.

In a packet flow, any attribute of the incoming event can be used as a key for the packet flow. Because each packet stream can be processed independently of the others, a packet stream allows many tasks to perform windowing calculations in parallel. All elements that reference the same key will be sent to the same parallel task.

For ungrouped data flows, the data source will not be split into multiple logical flows, and all window calculation logic will be executed in a single task.

Window Assigners

Now that we have determined whether Windows are grouped, we need to define the allocator, which defines how to assign elements to Windows.

WindowAssigner is responsible for assigning incoming elements to one or more Windows. Flink provides several predefined WindowAssigners based on some common application scenarios, Tumbling Windows, Sliding Windows, Session Windows and Global Windows. We can also customize window allocator logic by inheriting the WindowAssigner class. Flink’s built-in WindowAssigner allocates elements to Windows based on processing time or event time, except for Global Windows.

A time-based window contains a start timestamp (greater than or equal to) and an end timestamp (less than), and the time difference between the two is used to indicate the window size. At the same time, we can query the start and end timestamps through the TimeWindow provided by Flink, and obtain the maximum timestamp allowed for a given window through the maxTimestamp() method.

Tumbling Windows

The scroll window allocator assigns each element to a window of the specified size. Scrolling Windows have a fixed window size and do not overlap. For example, the following image shows a scrolling window set to a 5-minute window size, with a new window created every five minutes.

DataStream<T> input = ... ;// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
Copy the code

As shown in the last example above, tumbling Window Assigners contains an optional offset parameter that can be used to change the alignment of the window. For example, a scrolling hourly window without an offset would normally create a time window of 1:00:00.000-1:59:59.99, 2:00:00.000-2:59:59.999. When given an offset of 15 minutes, The time window will become 1:15:00.000-2:14:59.99,2:15:00.000-3:14:59.999. In practice, a common use scenario is to use offset to adjust the window to a Time zone other than UTC-0, such as time.hours (-8) to adjust the Time zone to EAST-8.

Sliding Windows

The slide allocator also assigns elements to a fixed-size time window. The window size is configured in the same way as the scroll window, except that the slide window has an additional slide parameter that controls how often the window slides. When slide is smaller than window size, the sliding Windows overlap. In this case the same element will be assigned to multiple Windows.

As shown in the following figure, a 10-minute slide window is set with a slide of 5 minutes. In this case, a new window will be created every 5 minutes, and this window will contain some elements from the previous window.

DataStream<T> input = ... ;// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
Copy the code

Similarly, we can set the offset for the window with the offset parameter.

Session Windows

The session window groups elements by active sessions. Unlike scrolling and sliding Windows, session Windows do not overlap and do not have a fixed start and end time. A session window is closed when no new data has been received within a specified period of time. The session window allocator can either configure a static constant session interval directly or specify the session interval time dynamically through a function.

DataStream<T> input = ... ;// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);
Copy the code

As above, fixed size interval session can pass the Time. The milliseconds (x), Time. Seconds (x), Time, minutes (x) to specify, dynamic interval session by implementing SessionWindowTimeGapExtractor interface to specify. ** Note: ** Since the session window does not have a fixed start and end time, its calculation method is different from that of scrolling and sliding Windows. A new window is created within a session window operator for each received element, and if the time interval between these elements is less than the defined session window interval, the amen is merged into a window. In order to be able to merge Windows, We need to define a Tigger Function and a Window Function (such as ReduceFunction, AggregateFunction, Or processWindowFunction.foldFunction cannot be used for merging).

Global Windows

The global window allocator assigns all elements with the same key value to the same window. This window mode requires us to set a custom Trigger, otherwise no calculations will be performed, because there is no natural end in the global window that can handle aggregate elements.

DataStream<T> input = ... ; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);Copy the code

Window Function

With the window allocator defined, we need to specify the calculations that will be applied to each window. This can be done by specifying Window Function, which will process each element in the Window once the system determines that it is ready for processing.

Window Functions are usually ReduceFunction, AggregateFunction, FoldFunction and ProcessWindowFunction. Of these, the first two are efficient because Flink incrementally aggregates each element as it reaches the window. ProcessWindowFunction holds an Iterable object for all elements contained in a window, along with additional meta information for the window to which the element belongs.

ProcessWindowFunction cannot execute efficiently because Flink must cache all elements in the window internally before calling the function. We can ProcessWindowFunction and ReduceFunction AggregateFunction, or FoldFunction function to alleviate this problem, You can get aggregate data for window elements and window meta data received by ProcessWindowFunction.

ReduceFunction

ReduceFunction is used to specify how to combine two elements in the input stream to produce an output element of the same type. Flink uses ReduceFunction to incrementally aggregate the elements in the window.

DataStream<Tuple2<String, Long>> input = ... ; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return newTuple2<>(v1.f0, v1.f1 + v2.f1); }});Copy the code

AggregateFunction

AggregateFunction can be called ReduceFunction IN a broad sense, which contains three element types: input type (IN), accumulator type (ACC) and output type (OUT). The AggregateFunction interface has a method for creating an initial accumulator, merging the values of two accumulators into one, and extracting the output from the accumulator.

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String.Long>, Tuple2<Long.Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator(a) {
    return new Tuple2<>(0L.0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return newTuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ... ; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());
Copy the code

FoldFunction

FoldFunction is used to specify how input elements in a window are combined with output elements of a given type. For each element entered into the window, incrementally call FoldFunction to merge it with the current output value.

DataStream<Tuple2<String, Long>> input = ... ; input .keyBy(<key selector>) .window(<window assigner>) .fold("".new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         returnacc + value.f1; }});Copy the code

Note: Fold () cannot be used for session Windows or other mergable Windows

ProcessWindowFunction

From ProcessWindowFunction you can get an iterator containing all the elements in the window and a Context object for accessing time and state information, making it more flexible than other window functions. Of course, this also introduces a higher performance overhead and resource consumption.

public abstract class ProcessWindowFunction<IN.OUT.KEY.W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process( KEY key, Context context, Iterable
       
         elements, Collector
        
          out)
        
        throws Exception;

   	/** * The context holding window metadata. */
   	public abstract class Context implements java.io.Serializable {
   	    /** * Returns the window that is being evaluated. */
   	    public abstract W window(a);

   	    /** Returns the current processing time. */
   	    public abstract long currentProcessingTime(a);

   	    /** Returns the current event-time watermark. */
   	    public abstract long currentWatermark(a);

   	    /**
   	     * State accessor for per-key and per-window state.
   	     *
   	     * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
   	     * by implementing {@link ProcessWindowFunction#clear(Context)}.
   	     */
   	    public abstract KeyedStateStore windowState(a);

   	    /** * State accessor for per-key global state. */
   	    public abstract KeyedStateStore globalState(a); }}Copy the code

The key argument is the key obtained by the KeySelector specified in keyBy(). For the key of a tuple index or a key referenced by a string field, the type of the key parameter is a tuple type, and we need to manually convert it to a tuple of the correct size to extract the key value from it.

DataStream<Tuple2<String, Long>> input = ... ; input .keyBy(t -> t.f0) .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());

/ *... * /

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String.Long>, String.String.TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: "+ count); }}Copy the code

ProcessWindowFunction with Incremental Aggregation

As mentioned earlier, we can combine ReduceFunction, AggregateFunction or FoldFunction with ProcessWindowFunction so that not only can we incrementally perform window calculations, You can also get some additional window meta information from ProcessWindowFunction.

Incremental Window Aggregation with ReduceFunction

The following example shows how to combine the two to return the minimum event in the window and the start time of the window

DataStream<SensorReading> input = ... ; input .keyBy(<key selector>) .timeWindow(<duration>) .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      returnr1.value() > r2.value() ? r2 : r1; }}private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading.Tuple2<Long.SensorReading>, String.TimeWindow> {

  public void process(String key, Context context, Iterable
       
         minReadings, Collector
        
         > out)
        
        {
      SensorReading min = minReadings.iterator().next();
      out.collect(newTuple2<Long, SensorReading>(context.window().getStart(), min)); }}Copy the code
Incremental Window Aggregation with AggregateFunction

Example: Calculate the average value of elements and output both the key value and the average value.

DataStream<Tuple2<String, Long>> input = ... ; input .keyBy(<key selector>) .timeWindow(<duration>) .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String.Long>, Tuple2<Long.Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator(a) {
    return new Tuple2<>(0L.0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return newTuple2<>(a.f0 + b.f0, a.f1 + b.f1); }}private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double.Tuple2<String.Double>, String.TimeWindow> {

  public void process(String key, Context context, Iterable
       
         averages, Collector
        
         > out)
        
        {
      Double average = averages.iterator().next();
      out.collect(newTuple2<>(key, average)); }}Copy the code
Incremental Window Aggregation with FoldFunction

Example: Returns the number of events in a window, along with the key value and the window end time.

DataStream<SensorReading> input = ... ; input .keyBy(<key selector>) .timeWindow(<duration>) .fold(new Tuple3<String, Long, Integer>("".0L.0), new MyFoldFunction(), new MyProcessWindowFunction())

// Function definitions

private static class MyFoldFunction
    implements FoldFunction<SensorReading.Tuple3<String.Long.Integer> > {

  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
      Integer cur = acc.getField(2);
      acc.setField(cur + 1.2);
      returnacc; }}private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple3<String.Long.Integer>, Tuple3<String.Long.Integer>, String.TimeWindow> {

  public void process(String key, Context context, Iterable
       
        > counts, Collector
        
         > out)
        
        {
    Integer count = counts.iterator().next().getField(2);
    out.collect(newTuple3<String, Long, Integer>(key, context.window().getEnd(),count)); }}Copy the code

Triggers

Trigger is used to determine when a window is processed by a window function. Each WindowAssigner in Flink has a default Trigger. We can also trigger(…) Function to customize trigger rules.

The Trigger interface contains the following five methods:

  • TheonElement()method is called for each element that is added to a window.
  • TheonEventTime()method is called when a registered event-time timer fires.
  • TheonProcessingTime()method is called when a registered processing-time timer fires.
  • TheonMerge()method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge,_e.g._when using session windows.
  • Finally theclear()method performs any action needed upon removal of the corresponding window.

Evictors

The Flink window mode allows us to specify an optional Evictor in addition to WindowAssigner and Trigger. Evictor can remove elements from a window after a trigger is fired, before or after a window function is used.

/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
Copy the code

Flink provides us with three predefined Evictors:

  • CountEvictor: Preserves a specified number of elements in the window and removes other elements from the beginning of the window buffer.
  • DeltaEvictor: Gets a DeltaFunction function and threshold, calculates the Delta value of the remaining elements and the last element in the window buffer, and then removes the elements whose Delta value is greater than or equal to the threshold.
  • TimeEvictor: holds a millisecond valueintervalParameter, for a given window, finds the maximum timestamp max_ts in the element, and then removes those elements whose timestamp is less than the max_TS-interval value.

All predefined Evictors are executed before the window function is used.

Allowed Lateness

When using the event time window, it is possible to delay the arrival of elements. For example, Flink’s watermark, which tracks single-event time processes, has passed the end time of the element’s window.

By default, when watermark passes the window end time, elements that arrived late are discarded. However, Flink allows us to specify the maximum delay time for a window, how long elements can be delayed before being deleted (when watermark reaches the end time), which defaults to 0. Depending on the trigger used, elements arriving late but not discarded can cause the window to fire again, as can be the case with EventTimeTrigger.

DataStream<T> input = ... ; input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>);Copy the code

Side Output

Flink’s Side Output allows us to get a data stream of discarded elements. The bypass output stream can be obtained by setting the window sideOutputLateData(OutputTag) as shown below.

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data") {}; DataStream<T> input = ... ; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);Copy the code

Understand Flink Windows


Welcome to follow my wechat official account: gao Gaomu. The first time to read the latest experience sharing, communication and growth together.