This is the 8th day of my participation in the August More Text Challenge. For details, see:August is more challenging

The body of the

Why do we need Windows?

For flow processing, if we need to calculate the sum, average, or the maximum value, minimum value, etc., is can’t do, because the data has been a steady stream of produce, that is, data is no boundary, so can’t seek the maximum, minimum, average, etc., so in order to some numerical statistical functions, we must specify the time period, It is possible to take some data values for a certain period of time. It is also possible to take data values for some data, so the aggregation on the stream needs to be scoped by the window, such as “calculate the last 5 minutes” or “sum of the last 100 elements”.

What is window?

Windows are a way of splitting infinite data into finite chunks

Windows can be Time Window (e.g., every 30 seconds) or data-driven Count Window (e.g., every 100 elements).

1. Introduction to basic types of Windows

Window is usually divided into different types:  tumbling Windows:  no overlapping 】 【 rolling window sliding Windows, sliding window “overlap”  session Windows: session window, generally no one to use

* * * window type: no overlapping Windows

What is the sliding Windows?

2. Flink window introduction

Application of the Time Window

Time window is divided into rolling window and sliding window, these two window call method is the same, both is to call the timeWindow method, if passed one parameter is a rolling window, if passed two parameters is a sliding window

The application of Count Windos

CountWinodw can also be classified as a rolling window or a sliding window. The method of calling countWindow is the same. If one parameter is passed, it is a rolling window, and if two parameters are passed, it is a sliding window

Customize Windows applications

If the Time Window and countWindow are not enough, we can also use custom Windows for data statistics and other functions.

3. Numerical aggregation statistics of Windows

For numeric statistics in a window, we can either make incremental aggregate statistics or make full aggregate statistics

practice

Incremental aggregate statistics:

• reduceFunction (Reduce) • Aggregate (aggregateFunction) • Sum (),min(), Max ()

Requirement: Collect the accumulated value of data every 5 seconds by receiving the input data from the socket

Code implementation:

package com.shockang.study.bigdata.flink.window

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkTimeCount {
  def main(args: Array[String) :Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val socketStream: DataStream[String] = environment.socketTextStream("node01".9000)
    val print: DataStreamSink[(Int.Int)] = socketStream
      .map(x => (1, x.toInt))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .reduce(new ReduceFunction[(Int.Int)] {
        override def reduce(t: (Int.Int), t1: (Int.Int)) : (Int.Int) = {
          (t._1, t._2 + t1._2)
        }
      }).print()

    environment.execute("startRunning")}}Copy the code

Full aggregate statistics:

Wait until the window is closed, or all the data in the window are collected, and then make statistics, which can be used to find the maximum, minimum, average and other data collected in the window. Process (processWindowFunction) process(processWindowFunction)

ProcessWindowFunction provides more context information than windowFunction.

Requirements: Obtain the average value of every 3 pieces of data through full aggregation statistics

package com.shockang.study.bigdata.flink.window

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector

object FlinkCountWindowAvg {
  def main(args: Array[String) :Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    val socketStream: DataStream[String] = environment.socketTextStream("node01".9000)
    // Calculates the average value of the data in a window
    val socketDatas: DataStreamSink[Double] = socketStream.map(x => (1, x.toInt))
      .keyBy(0)
      //.timeWindow(Time.seconds(10))
      .countWindow(3)
      // Count the average value of the window using the process method
      .process(new MyProcessWindowFunctionclass).print()
    // The execute method must be called, otherwise the program will not execute
    environment.execute("count avg")}}/** ProcessWindowFunction requires four parameters * input parameter type, output parameter type, aggregated key type, window lower bound ** /
class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int.Int), Double.Tuple.GlobalWindow] {
  override def process(key: Tuple, context: Context, elements: 可迭代[(Int.Int)], out: Collector[Double) :Unit = {
    var totalNum = 0;
    var countNum = 0;
    for (data <- elements) {
      totalNum += 1
      countNum += data._2
    }
    out.collect(countNum / totalNum)
  }
}
Copy the code