The previous four articles provided detailed explanations of the operators commonly used in Flink and included numerous use cases:

  • Flink single data stream basic conversion: Map, filter, flatMap
  • Flink key-based group transformations: keyBy, Reduce, and Aggregations
  • Flink multi-data stream conversion: Union and Connect
  • Flink parallelism and data redistribution

In summary, operators using Flink must be customized, either using Lambda expressions or inheriting and rewriting function classes. This article will take you through some Flink source code, and provide specific operator use examples.

The function class

For map, flatMap, reduce and other methods, we can realize MapFunction, FlatMapFunction, ReduceFunction and other interface interfaces. Each of these function class signatures has a generic parameter that defines the data type of the input or output of the function. We need to inherit these classes and rewrite the custom functions inside. Take FlatMapFunction corresponding to flatMap as an example, which is defined in the source code as:

public interface FlatMapFunction<T.O> extends Function.Serializable {

	void flatMap(T value, Collector<O> out) throws Exception;
  
}
Copy the code

This is an interface class that inherits Flink’s Function interface. The functional interface has only one Abstract function Method (Single Abstract Method), which is intended to facilitate the use of Java 8 Lambda expressions. In addition, it inherits Serializable for serialization, since these functions are sent to various TaskManagers at run time and are serialized and deserialized before and after delivery. It is important to note that when using these functions, you must ensure that everything within the function can be serialized. If you have something that can’t be serialized, either use the Rich function classes described next, or rewrite Java’s serialization and deserialization methods.

Further observation of FlatMapFunction shows that this function has two generic types T and O, T is the input and O is the output. When using FlatMapFunction, the corresponding input and output data types should be set. The two parameters of the function also correspond to the generic types of the input and output. That is, the parameter value is the input of flatMap, the data type is T, and the parameter out is the output of flatMap. We need to write the data of type O into out.

We inherit the FlatMapFunction and implement the flatMap, which cuts only strings with a length greater than limit:

// Use the FlatMapFunction filter logic to cut words only if the string length is greater than limit
class WordSplitFlatMap(limit: Int) extends FlatMapFunction[String.String] {
  override def flatMap(value: String, out: Collector[String) :Unit = {
    // split returns an Array
    Collect each element in the Array using collector.collect to flatten the list
    if (value.size > limit) {
      value.split("").foreach(out.collect)
    }
  }
}

val dataStream: DataStream[String] = senv.fromElements("Hello World"."Hello this is Flink")

val function = dataStream.flatMap(new WordSplitFlatMap(10))
Copy the code

The Collector plays the role of collecting output.

Lambda expressions

When you don’t need to deal with very complex business logic, Lambda expressions can be a better choice because they make your code more compact. Both Java 8 and Scala support Lambda expressions very well.

Flink’s Scala source code has three definitions for flatMap. Let’s take a look at the first one:

def flatMap[R: TypeInformation](fun: (T.Collector[R]) = >Unit) :DataStream[R] = {... }Copy the code

FlatMap input is generic T, output is generic R, and receives a Lambda expression named fun of the form (T, Collector[R] => {… }).

Continuing with the tangent word, the Lambda expression is:

val lambda = dataStream.flatMap{
  (value: String, out: Collector[String]) = > {if (value.size > 10) {
      value.split("").foreach(out.collect)
    }
  }
}
Copy the code

Then let’s look at the second definition in the source code:

def flatMap[R: TypeInformation](fun: T= >TraversableOnce[R) :DataStream[R] = {... }Copy the code

Unlike the previous Lambda expression, the input is a generic T and the output is a TraversableOnce[R], which denotes a list of R’s. Instead of using Collector to collect the output, instead of printing a list directly, Flink flattened the list for us. Using TraversableOnce also causes us to return a list anyway, even an empty list, otherwise the function definition cannot be matched. In summary, the input of a Lambda expression in this scenario is a T, and the output is a list of R in any case, even if it is an empty list.

// Only sentences with more than 15 strings are processed
val longSentenceWords = dataStream.flatMap {
  input => {
    if (input.size > 15) {
      // The output is TraversableOnce so the return must be a list
      Array[String] = Seq[String]
      input.split("").toSeq
    } else {
      // An empty list must be returned when null, otherwise the return value cannot match the TraversableOnce!
      Seq.empty
    }
  }
}
Copy the code

When using Lambda expressions, we should gradually learn to use Intellij Idea’s type checking and matching capabilities. For example, in this case, if the returned value is not a TraversableOnce, then Intellij Idea will mark the line in red to tell us that the input or output type does not match.

In addition, there is a third scala-only way to use Lambda expressions. In order to maintain consistency between Java and Scala apis, some scala-specific features are not put into the standard API, but integrated into an extension package. The API supports Partial functions that match types, combined with the case keyword, to better describe data types semantically:

val data: DataStream[(String.Long.Double)] = [...].  data.flatMapWith {case (symbol, timestamp, price) => // ...
}
Copy the code

To use this API, you need to add a reference:

import org.apache.flink.streaming.api.scala.extensions._
Copy the code

This approach defines variable names and types for the input, making it easier for the reader to read the code while preserving the brevity of functional programming. Most operators of Spark support this function by default. For Spark users, note this difference when migrating to Flink. In addition, mapWith, filterWith, keyingBy, reduceWith also support this function.

Using flatMapWith, the previous cut word can be implemented as:

val flatMapWith = dataStream.flatMapWith {
  case (sentence: String) = > {if (sentence.size > 15) {
      sentence.split("").toSeq
    } else {
      Seq.empty
    }
  }
}
Copy the code

Rich function class

On the basis of the above two operator customization, Flink also provides Rich function class. In terms of name, this function class adds the Rich prefix to the ordinary function class, such as RichMapFunction, RichFlatMapFunction or RichReduceFunction, etc. Instead of a normal function class, the Rich function class adds:

  • open()Method: Flink executes this method before the operator is called and can be used for some initialization.
  • close()Method: Flink executes this method after the last call to the operator, which can be used to free up some resources.
  • getRuntimeContextMethod: Get the runtime context. Each parallel operator subtask has a runtime context, which records some information during the operation of this operator, including the current degree of parallelism of the operator, operator subtask number, broadcast data, accumulator, monitoring data. Most importantly, we can retrieve state data from the context.

We can look at the function signature in the source code:

public abstract class RichFlatMapFunction<IN.OUT> extends AbstractRichFunction implements FlatMapFunction<IN.OUT>
Copy the code

It not only implements FlatMapFunction interface class, but also inherits AbstractRichFunction. AbstractRichFunction is an abstract class with a member variable RuntimeContext and methods such as open, close, and getRuntimeContext.

We tried FlatMapFunction and used an accumulator. In stand-alone environment, we can do it with a for loop cumulative statistics, but in a distributed computing environment, computation is distributed in more than one node, data processing part of each node, so simple cycle can’t satisfy computing, accumulator is big data framework to help us achieve a mechanism that allows us to accumulate in multi-node statistics.

// Use RichFlatMapFunction
// An Accumulator is added
class WordSplitRichFlatMap(limit: Int) extends RichFlatMapFunction[String.String] {
  // Create an accumulator
  val numOfLines: IntCounter = new IntCounter(0)

  override def open(parameters: Configuration) :Unit = {
    Register the accumulator in RuntimeContext
    getRuntimeContext.addAccumulator("num-of-lines".this.numOfLines)
  }

  override def flatMap(value: String, out: Collector[String) :Unit = {
    // Call the accumulator during run
    this.numOfLines.add(1)
    if(value.size > limit) {
      value.split("").foreach(out.collect)
    }
  }
}

val richFunction = dataStream.flatMap(new WordSplitRichFlatMap(10))

val jobExecuteResult = senv.execute("basic flatMap transformation")

// Get the result of the accumulator after execution
val lines: Int = jobExecuteResult.getAccumulatorResult("num-of-lines")
println("num of lines: " + lines)
Copy the code