Some of the operators and functions mentioned earlier can do some Time operations, but cannot get the current Processing Time or Watermark timestamp of the operator, which is simple to call but relatively limited in functionality. If you want to get the timestamp of the Watermark in a data stream, or move backwards and forwards in time, you need to use the ProcessFunction family of functions, which are the lowest level APIS in the Flink architecture and provide more fine-grained access to the data stream. Flink SQL is implemented based on these functions, which are also used by some highly personalized business scenarios.

At present, This series of functions mainly includes KeyedProcessFunction, ProcessFunction, CoProcessFunction, KeyedCoProcessFunction, ProcessJoinFunction, and ProcessWindo WFunction and other functions, these functions have different focuses, but the core functions are relatively similar, mainly including two points:

  • States: We can access and update Keyed State in these functions.

  • Timer: By setting Timer like an alarm clock, we can design more complex business logic in the time dimension.

For an introduction to state, see my Flink state Management article, where we’ll focus on several other features of using ProcessFunction. All the code for this article is uploaded to my github: github.com/luweizheng/…

How to use Timer

We can think of Timer as an alarm clock. Before using it, register a future time in Timer. When the time arrives, the alarm clock will “ring” and the program will execute a callback function, in which certain business logic will be executed. Here take KeyedProcessFunction as an example to introduce the registration and use of Timer.

ProcessFunction has two important interfaces, processElement and onTimer. The processElement function has the following Java signature in the source code:

// Process an element in the data stream
public abstract void processElement(I value, Context ctx, Collector<O> out)
Copy the code

The processElement method processes an element in the data stream and outputs it through Collector

. Context is what distinguishes it from FlatMapFunction and other ordinary functions. Developers can use Context to get the timestamp, access the TimerService, and set the Timer.

The other interface is onTimer:

// The callback function after the time has arrived
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Copy the code

This is a callback function, and Flink will call onTimer when it is “alarm time” and perform some business logic. There’s also an argument OnTimerContext, which actually inherits the previous Context, which is almost the same as Context.

The main logic of using Timer is as follows:

  1. inprocessElementIn the methodContextRegister a future timestamp t. The semantics of this timestamp can be either Processing Time or Event Time, depending on the business requirements.
  2. inonTimerMethod to implement some logic, at time T,onTimerMethods are called automatically.

From the Context, we can get a TimerService, which is an interface to access the timestamp and Timer. We can through the Context. TimerService. RegisterProcessingTimeTimer or ` ` Context. TimerService. RegisterEventTimeTimer these two methods to register the Timer, All you need to do is pass in a timestamp. We can through the Context. TimerService. DeleteProcessingTimeTimer and Context timerService. DeleteEventTimeTimer to delete registered before the Timer. In addition, you can obtain the current timestamp: Context. TimerService. CurrentProcessingTime and Context timerService. CurrentWatermark `. As you can see from the function name, these are functions that occur in pairs, with two methods corresponding to two time semantics.

Note that we can only register a Timer on KeyedStream. Different timers can be registered with different timestamps under each Key, but only one Timer can be registered for each timestamp of each Key. If you wanted to apply a Timer on a DataStream, you could map all the data to a forged Key, but then all the data would flow into an operator subtask.

Let’s use the stock trading scenario again to explain how to use the Timer. A stock transaction includes: stock symbol, time stamp, stock price, volume. We now want to see if a stock has been rising in a row for 10 seconds, and if it has been rising, send an alert.

case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)

class IncreaseAlertFunction(intervalMills: Long)
extends KeyedProcessFunction[String.StockPrice.String] {

  // Status: Saves the last traded price of a stock
  lazy val lastPrice: ValueState[Double] =
  getRuntimeContext.getState(
    new ValueStateDescriptor[Double] ("lastPrice".Types.of[Double]))// State: saves the timer timestamp of a stock
  lazy val currentTimer: ValueState[Long] =
  getRuntimeContext.getState(
    new ValueStateDescriptor[Long] ("timer".Types.of[Long]))override def processElement(stock: StockPrice,
                              context: KeyedProcessFunction[String.StockPrice.String] #Context,
                              out: Collector[String) :Unit = {

    // Get the data in the lastPrice state, which is initialized to 0 the first time it is used
    val prevPrice = lastPrice.value()
    / / update the lastPrice
    lastPrice.update(stock.price)
    val curTimerTimestamp = currentTimer.value()
    if (prevPrice == 0.0) {
      // This is the first time to use it
    } else if (stock.price < prevPrice) {
      // If the price of the new incoming stock decreases, delete the Timer, otherwise the Timer is kept
      context.timerService().deleteEventTimeTimer(curTimerTimestamp)
      currentTimer.clear()
    } else if (stock.price >= prevPrice && curTimerTimestamp == 0) {
      // If the price of the new stock goes up
      // curTimerTimestamp 0 indicates that currentTimer is empty and there is no corresponding Timer
      // New Timer = Current time + interval
      val timerTs = context.timestamp() + intervalMills

      val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      context.timerService().registerEventTimeTimer(timerTs)
      // Update currentTimer status, subsequent data will read currentTimer, make relevant judgment
      currentTimer.update(timerTs)
    }
  }

  override def onTimer(ts: Long,
                       ctx: KeyedProcessFunction[String.StockPrice.String] #OnTimerContext,
                       out: Collector[String) :Unit = {

    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey +
                " monotonically increased for " + intervalMills + " millisecond.")
    // Clear the currentTimer state
    currentTimer.clear()
  }
}
Copy the code

In the main logic, call KeyedProcessFunction with the following process operator:

val inputStream: DataStream[StockPrice] =...val warnings = inputStream
      .keyBy(stock => stock.symbol)
      Call the process function
      .process(new IncreaseAlertFunction(10000))
Copy the code

At Checkpoint, the Timer is also saved along with other state data. If you set some Timer using Processing Time semantics, the timestamp will expire upon restart, and those callback functions will be invoked immediately.

Side output SideOutput

Another feature of ProcessFunction is the ability to send part of the data to another stream, and the output to two streams can be of different data types. We mark the other stream with OutputTag[T]. Filter out some kind of data in ProcessFunction like this:

class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String.Stock.String] {

  override def processElement(stock: Stock,
                              context: KeyedProcessFunction[String.Stock.String] #Context,
                              out: Collector[String) :Unit = {

    // Other business logic...
    // Define an OutputTag, Stock as the data type of the SideOutput stream
    val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock] ("high-volume-trade")

    if (stock.volume > 1000) {
      // Filter out the Stock and send it to the OutputTag
      context.output(highVolumeOutput, stock)
    }
  }
}
Copy the code

In the main logic, the side output is obtained by:

SideOutput / / collection
val outputTag: OutputTag[Stock] = OutputTag[Stock] ("high-volume-trade")
val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)
Copy the code

As you can see from this example, KeyedProcessFunction has an output type of String and SideOutput has an output type of Stock, which can be different.

useProcessFunctionTo realize the Join

If you want to Join two data streams at a finer granularity, you can use CoProcessFunction or KeyedCoProcessFunction. Both of these functions have processElement1 and processElement2 methods that process each element of the first and second data flows, respectively. The data types and output types of the two data flows can be different. Although the data comes from two different streams, they can share the same state, so you can use the following logic to implement a Join:

  • Create one or more states that can be accessed by both data streams, using state A as an example.
  • processElement1Method processes the first data stream, updating state A.
  • processElement2Method processes the second data stream and generates the corresponding output based on the data in state A.

This time, we discuss stock price and media evaluation together. Assume that there is a media evaluation data stream for a certain stock, which contains positive and negative evaluations of the stock. Both streams flow into KeyedCoProcessFunction. The processElement2 method processes the incoming media data, updates the media evaluation to the state mediaState, and the processElement1 method processes the incoming stock trading data. Get the mediaState state to generate a new data stream. The two methods each process two data streams, share a state, and communicate by state.

In the main logic, we connect the two data streams, then keyBy according to the stock symbol, and then use the process operator:

val stockPriceRawStream: DataStream[StockPrice] =...val mediaStatusStream: DataStream[Media] =...val warnings = stockStream.connect(mediaStream)
      .keyBy(0.0)
      Call the process function
      .process(new AlertProcessFunction())
Copy the code

Concrete implementation of KeyedCoProcessFunction:

class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String.StockPrice.Media.StockPrice] {

  // mediaState
  private var mediaState: ValueState[String] = _

  override def open(parameters: Configuration) :Unit = {

    // Get the status from RuntimeContext
    mediaState = getRuntimeContext.getState(
      new ValueStateDescriptor[String] ("mediaStatusState", classOf[String))}override def processElement1(stock: StockPrice,
                               context: KeyedCoProcessFunction[String.StockPrice.Media.StockPrice] #Context,
                               collector: Collector[StockPrice) :Unit = {

    val mediaStatus = mediaState.value()
    if (null! = mediaStatus) {val newStock = stock.copy(mediaStatus = mediaStatus)
      collector.collect(newStock)
    }

  }

  override def processElement2(media: Media,
                               context: KeyedCoProcessFunction[String.StockPrice.Media.StockPrice] #Context,
                               collector: Collector[StockPrice) :Unit = {
    // The second stream updates mediaState
    mediaState.update(media.status)
  }

}
Copy the code

This example is relatively simple and does not use Timer. In actual business scenarios, Timer is generally used to clear expired states. Many Internet APP stitching machine learning sample is likely to be dependent on the function to achieve: machine learning is characterized by real time generated from the server, the user behavior on the APP is produced after the interaction, both belong to two different data streams, can according to this logic will be pieced together, the two data flow is obtained by joining together faster next round machine learning sample data. The intermediate data of the two data flows is placed in the state. In order to avoid the infinite growth of the state, Timer is needed to clear the expired state.

Note that if Event Time is used, both streams must be set to Watermark. If Event Time and Watermark are set for only one stream, the Timer function cannot be used in CoProcessFunction and KeyedCoProcessFunction. Because the process operator cannot determine at what time it should process the data.