Source code analysis: Flink’s Broadcast from examples and source code

0 x00 the

This article will guide you to be familiar with Flink’s broadcast variable mechanism through source code analysis and examples.

0x01 Service Requirements

1. Scenario requirements

The IP addresses in the blacklist are detected and filtered. The contents of the IP blacklist can be changed at any time, so it can be dynamically configured at any time.

This blacklist is assumed to exist in mysql. When Flink job starts, it will load this blacklist from mysql and be used as a variable by Flink operator.

Problem 2.

We do not want to restart the job to retrieve this variable. So you need a way to dynamically modify the variables in the operator.

3. Solutions

Use the broadcast method to solve. To do dynamic configuration updates.

The difference between broadcast and ordinary streaming data is that a stream of broadcast data can be processed by all partitions of the operator, while a stream of data can only be processed by a certain partition of the operator. Therefore, the characteristics of broadcast streams also make it suitable for dynamic configuration updates.

0 x02 overview

There are three difficulties in the broadcast part: the use of steps; How to customize functions; How to access state. Here is an overview for you first.

1. Procedure for using broadcast

  • Establish MapStateDescriptor
  • BroadcastStream is returned using the datastream. broadcast method
  • Datastream. connect connects a service DataStream to a BroadcastStream and returns a BroadcastConnectedStream
  • Through BroadcastConnectedStream. ProcessElement and processBroadcastElement processing process method, respectively

2. User-defined processing functions

  • BroadcastConnectedStream. Process to receive two types of function: KeyedBroadcastProcessFunction and BroadcastProcessFunction
  • Two types of function defines processElement, processBroadcastElement abstract method, more than just KeyedBroadcastProcessFunction defines a onTimer method, the default is an empty operation, allow subclasses override
  • ProcessElement Processes service data flows
  • ProcessBroadcastElement handles the broadcast data stream

3. Broadcast State

  • Broadcast State is always expressed as MAP State, which is map format. This is the most generic state primitive that Flink provides. Managed state is a kind of managed state, managed state is managed by Flink framework, such as ValueState, ListState, MapState, etc.
  • The user must create oneMapStateDescriptorTo obtain the corresponding status handle. This holds the name of the state, the type of value the state holds, and may contain user-specified functions
  • Checkpoint broadcast state is also checkpoint
  • Broadcast State is internal only, but RocksDB State Backend is not
  • Flink broadcasts state to each task. Note that state is not propagated across tasks and changes to it only apply to the task in which it resides
  • Downstream tasks may receive broadcast events in a different order, so be careful if you rely on their arrival order to process elements

0x03. Sample code

1. Sample code

An ideal example can be found directly from the Flink source code. The following code directly from StatefulJobWBroadcastStateMigrationITCase Flink source code, I will add comments.

  @Test
  def testRestoreSavepointWithBroadcast() :Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // The following two variables are used to determine the type of data emitted by a broadcast stream. A broadcast stream can emit multiple types of data simultaneously
    lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long.Long] ("broadcast-state-1".BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])

    lazy val secondBroadcastStateDesc = new MapStateDescriptor[String.String] ("broadcast-state-2".BasicTypeInfo.STRING_TYPE_INFO.BasicTypeInfo.STRING_TYPE_INFO)

    env.setStateBackend(new MemoryStateBackend)
    env.enableCheckpointing(500)
    env.setParallelism(4)
    env.setMaxParallelism(4)

    // The Source of the data stream and broadcast stream is the same kind of CheckpointedSource. The data stream performs a series of operators, such as flatMap
    val stream = env
      .addSource(
        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
      .keyBy(
        new KeySelector[(Long.Long), Long] {
          override def getKey(value: (Long.Long)) :Long = value._1
        }
      )
      .flatMap(new StatefulFlatMapper)
      .keyBy(
        new KeySelector[(Long.Long), Long] {
          override def getKey(value: (Long.Long)) :Long = value._1
        }
      )

    / / broadcasting stream
    val broadcastStream = env
      .addSource(
        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
      .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)

    // Combine data stream with broadcast stream
    stream
      .connect(broadcastStream)
      .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
      .addSink(new AccumulatorCountingSink)}}// User-defined handler function
class TestBroadcastProcessFunction
  extends KeyedBroadcastProcessFunction
    [Long, (Long.Long), (Long.Long), (Long.Long)] {

  FirstBroadcastStateDesc, secondBroadcastStateDesc, broadcastStatedesc, broadcastStateDesc, broadcastStateDesc, broadcastStateDesc, broadcastStateDesc, broadcastStateDesc
      
  // Two mapState Descriptors are used to access BroadcastState, so that variables can be passed between processBroadcastElement and processElement. We can definitely define a new MapStateDescriptor, as long as it's recognized between processBroadcastElement and processElement.
      
  // the parameter "broadcast-state-1" is name. Flink uses this name to access the MapStateDescriptor from the flink runtime system
  lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long.Long] ("broadcast-state-1".BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])

  val secondBroadcastStateDesc = new MapStateDescriptor[String.String] ("broadcast-state-2".BasicTypeInfo.STRING_TYPE_INFO.BasicTypeInfo.STRING_TYPE_INFO)

  override def processElement(
                value: (Long.Long),
                ctx: KeyedBroadcastProcessFunction
                [Long, (Long.Long), (Long.Long), (Long.Long)] #ReadOnlyContext,
                out: Collector[(Long.Long)]) :Unit = {

    // The Flink source code directly forwards the received business variables
    out.collect(value) 
  }

  override def processBroadcastElement(
                value: (Long.Long),
                ctx: KeyedBroadcastProcessFunction
                [Long, (Long.Long), (Long.Long), (Long.Long)] #Context,
                out: Collector[(Long.Long)]) :Unit = {
    // The latest broadcast variable is stored in processElement and can be retrieved and used again. The name of firstBroadcastStateDesc is used to obtain BroadcastState
    ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
    ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString)
  }
}

// Source of broadcast streams and data streams
private class CheckpointedSource(val numElements: Int)
  extends SourceFunction[(Long.Long)] with CheckpointedFunction {

  private var isRunning = true
  private var state: ListState[CustomCaseClass] = _

  // Simply send periodically
  override def run(ctx: SourceFunction.SourceContext[(Long.Long)]) {
    ctx.emitWatermark(new Watermark(0))
    ctx.getCheckpointLock synchronized {
      var i = 0
      while (i < numElements) {
        ctx.collect(i, i)
        i += 1}}// don't emit a final watermark so that we don't trigger the registered event-time
    // timers
    while (isRunning) Thread.sleep(20)}}Copy the code

2. Technical difficulties

MapStateDescriptor

First, a few concepts:

  • Flink contains two basic states: Keyed State and Operator State.
  • Keyed State and Operator State can exist in two forms: raw State and managed State.
  • Managed states are states managed by the Flink framework, such as ValueState, ListState, MapState, etc.
  • Raw state refers to the raw state. Users manage the data structure of the raw state by themselves. The framework uses byte[] to read and write the data in the raw state and knows nothing about the internal data structure.
  • MapState is a type of managed state: that is, the state value is a map. By the userputorputAllMethod to add elements.

Returning to our example, the broadcast variable is part of the OperatorState and is stored as a managed MapState. In the specific method is DefaultOperatorStateBackend getBroadcastState implementation

So we need to describe broadcast state with MapStateDescriptor, so the use of MapStateDescriptor is more flexible, because it’s a key, and value is similar, so I think it’s more convenient to use a class for value. Especially for those of you who are moving from another language to Scala.

processBroadcastElement

// This function is relatively simple to handle because it is mainly used for control
override def processBroadcastElement() :Unit = {
    // The latest broadcast variable can be stored in processElement, which can be retrieved and used again, for example
    ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
}
Copy the code

processElement

This function needs to be used in conjunction with processBroadcastElement
override def processElement(a): Unit = {
    // You can take broadcast variables stored before processBroadcastElement and use them to process business variables, for example
   val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
    "broadcast-state-2",
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO)  

    var actualSecondState = Map[String, String]()
    for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) {
      val v = secondExpectedBroadcastState.get(entry.getKey).get
      actualSecondState += (entry.getKey -> entry.getValue)
    }  

   // Even if it's associated with processBroadcastElement, you can store variables of any type. It does not have to be the same type as the broadcast variable. The point is to declare the new corresponding MapStateDescriptor
   // MapStateDescriptor inherits state Descriptor, where state is of the MapState type and value is of the Map type
}
Copy the code

Use in combination

Because of certain limitations, the following can only be found from the Internet to tell you an example.

// The schema is always stored in MapState with NULL as the key. Broadcast state is always represented as MapState, which is the most common state primitive provided by Flink.
MapStateDescriptor<Void, Pattern> bcStateDescriptor = 
  new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));

// Store broadcast variables into BroadcastState when processing broadcast variables
 public void processBroadcastElement(Pattern pattern, Context ctx, Collector
       
        > out)
        throws Exception {
   // store the new pattern by updating the broadcast state
   BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);
 }

// You can see that broadcast variables from BroadcastState are actually accessed using the name string "Patterns" as the key when processing business variables.
  public void processElement(Action action, ReadOnlyContext ctx, Collector
       
        > out)
        throws Exception {
   // get current pattern from broadcast state
   Pattern pattern = ctx.getBroadcastState(this.patternDesc)
     // access MapState with null as VOID default value
     .get(null);
   // get previous action of current user from keyed state
   String prevAction = prevActionState.value();
   if(pattern ! =null&& prevAction ! =null) {
     // user had an action before, check if pattern matches
     if (pattern.firstAction.equals(prevAction) && 
         pattern.secondAction.equals(action.action)) {
       // MATCH
       out.collect(newTuple2<>(ctx.getCurrentKey(), pattern)); }}// update keyed state and remember action for next pattern evaluation
   prevActionState.update(action.action);
 }
Copy the code

0x04. Flink source code analysis

1. Logical flow of broadcast

 * The life cycle of the Broadcast:
 * {@code* - initialization logic - > with a BroadcastConnectedStream combine data flow and radio flow to convert the topological * * | + -- -- -- -- > businessStream = DataStream. Filter. The map... * |// Handle the data flow of business logic. BusinessStream is plain DataStream
 *        +---->  broadcastStream = DataStream.broadcast(broadcastStateDesc)
 *        |       // broadcastStream is a type of broadcastStream
 *        +---->  businessStream.connect(broadcastStream)
 *        |                     .process(new processFunction(broadcastStateDesc))
 *        |       // Add a BroadcastConnectedStream to a BroadcastConnectedStream
 *        +----------> process @ BroadcastConnectedStream   
 *        |                TwoInputStreamOperator<IN1, IN2, OUT> operator =
 *        |                new CoBroadcastWithNonKeyedOperator<>(clean(function),
 *        |                broadcastStateDescriptors);
 *        |                return transform(outTypeInfo, operator);  
 *        |       // Generate an operator of type TwoInputStreamOperator and perform a transform
 *        +----------------> transform @ BroadcastConnectedStream  
 *        |                      transform = new TwoInputTransformation<>(
 *        |       			  	       inputStream1.getTransformation(), / / business flow
 *        |       			  	       inputStream2.getTransformation(), / / broadcasting stream
 *        |       			  	       ifunctionName, // User's UDF
 *        |       			  	       operator, / / operator CoBroadcastWithNonKeyedOperator
 *        |       			  	       outTypeInfo);  // Output type
 *        |       	      		   returnStream = new SingleOutputStreamOperator(transform);
 *        |       			         getExecutionEnvironment().addOperator(transform)
 *        |       // Combine the traffic flow, broadcast flow, and topology to form a transformation and add it to Env to complete the topology transformation
 *        |       / / returns the result is a SingleOutputStreamOperator finally.* -- BroadcastStream. * simply wraps a DataStream and records the corresponding StateDescriptors of the BroadcastStreampublic class BroadcastStream<T> {  
	private final StreamExecutionEnvironment environment;
	private final DataStream<T> inputStream;
	private finalList<MapStateDescriptor<? ,? >> broadcastStateDescriptors; } * data structures: * - BroadcastConnectedStream. * the business flow, broadcasting and flow together, and then generates the operator and topologypublic class BroadcastConnectedStream<IN1.IN2> {
	private final StreamExecutionEnvironment environment;
	private final DataStream<IN1> inputStream1;
	private final BroadcastStream<IN2> inputStream2;
	private finalList<MapStateDescriptor<? ,? >> broadcastStateDescriptors; } * real calculation: * - CoBroadcastWithNonKeyedOperator - > to real BroadcastProcessFunction execution, is done herepublic class CoBroadcastWithNonKeyedOperator<IN1.IN2.OUT>
		extends AbstractUdfStreamOperator<OUT.BroadcastProcessFunction<IN1.IN2.OUT>>
		implements TwoInputStreamOperator<IN1.IN2.OUT> {
  
  private finalList<MapStateDescriptor<? ,? >> broadcastStateDescriptors;private transient TimestampedCollector<OUT> collector;
	private transientMap<MapStateDescriptor<? ,? >, BroadcastState<? ,? >> broadcastStates;private transient ReadWriteContextImpl rwContext;
	private transient ReadOnlyContextImpl rContext;
  
	@Override
	public void processElement1(StreamRecord<IN1> element) throws Exception {
		collector.setTimestamp(element);
		rContext.setElement(element);
    // The user - defined processElement is invoked when the latest service data is available upstream
    // At this point, the broadcast configuration information stored previously can be retrieved, and the business data flow can be processed
		userFunction.processElement(element.getValue(), rContext, collector);
		rContext.setElement(null);
	}

	@Override
	public void processElement2(StreamRecord<IN2> element) throws Exception {
		collector.setTimestamp(element);
		rwContext.setElement(element);
    // Call the user-defined processBroadcastElement when there is data upstream
    // This is where the latest broadcast configuration information can be saved
		userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
		rwContext.setElement(null); }}Copy the code

2. Key functions of DataStream

// Generate the corresponding data streams respectively
public class DataStream<T> {
  protected final StreamExecutionEnvironment environment;
  protected final Transformation<T> transformation;

	@PublicEvolving
	public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
		return new BroadcastConnectedStream<>(
				environment,
				this,
				Preconditions.checkNotNull(broadcastStream),
				broadcastStream.getBroadcastStateDescriptor());
	}
		
	@PublicEvolving
	public BroadcastStream<T> broadcast(finalMapStateDescriptor<? ,? >... broadcastStateDescriptors) {
		final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
		return newBroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors); }}Copy the code

3. Key data structure MapStateDescriptor

It is mainly used to declare various metadata information. As you can see later, the system stores/obtains the State corresponding to the MapStateDescriptor using the name of the MapStateDescriptor, that is, the first parameter.

public class MapStateDescriptor<UK.UV> extends StateDescriptor<MapState<UK.UV>, Map<UK.UV>> {
	/**
	 * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
	 *
	 * @param name The name of the {@code MapStateDescriptor}.
	 * @param keySerializer The type serializer for the keys in the state.
	 * @param valueSerializer The type serializer for the values in the state.
	 */  
	public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
		super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
	}

	/**
	 * Create a new {@code MapStateDescriptor} with the given name and the given type information.
	 *
	 * @param name The name of the {@code MapStateDescriptor}.
	 * @param keyTypeInfo The type information for the keys in the state.
	 * @param valueTypeInfo The type information for the values in the state.
	 */
	public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
		super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
	}

	/**
	 * Create a new {@code MapStateDescriptor} with the given name and the given type information.
	 *
	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
	 * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
	 *
	 * @param name The name of the {@code MapStateDescriptor}.
	 * @param keyClass The class of the type of keys in the state.
	 * @param valueClass The class of the type of values in the state.
	 */
	public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
		super(name, new MapTypeInfo<>(keyClass, valueClass), null); }}Copy the code

4. State access

The state passed between processBroadcastElement and processElement is stored in Flink using the name key of the MapStateDescriptor. Ctx.getbroadcaststate (firstBroadcastStateDesc).put(value._1, value._2). So next we need to introduce Flink’s concept of State.

State vs checkpoint

To distinguish between the two concepts, state generally refers to the state of a specific task/operator. A checkpoint is a global status snapshot of a Flink Job at a specific time, including the status of all tasks and operators. Flink performs fault tolerance and recovery by periodically checking.

Flink contains two basic states: Keyed State and Operator State.

Keyed State

As the name implies, is based on the state on KeyedStream. This state is bound to a specific key, and may correspond to a state for each key on the KeyedStream.

Operator State

Unlike Keyed State, Operator State is bound to a concurrent instance of a particular Operator, and there is only one State for the entire Operator. In contrast, on an operator, there may be many keys corresponding to multiple keyed states.

Flink’s Kafka Connector, for example, uses operator state. It stores all (partition, offset) mappings of the consumer topics in each connector instance.

Flink Raw and Managed State

This is another dimension.

Keyed State and Operator State exist in managed and raw State respectively.

Managed state is a state managed by the Flink framework runtime, such as an internal Hash table or RocksDB. For example, “ValueState”, “ListState”, etc. Flink Runtime encodes these states and writes them to checkpoint.

For example, the managed Keyed state interface provides access to different types of states that operate under the key of the current input data. In other words, these states are only available on KeyedStream, via stream.keyby (…). Get KeyedStream. Instead, we can use managed Operator State by implementing CheckpointedFunction or the ListCheckpointed interface.

Raw state refers to the original state. Users manage the specific data structure of the state and save it in the data structure of the operator. At checkpoint, Flink does not know what it is, but writes a sequence of bytes to checkpoint.

It is generally recommended to use the managed state on DataStream, but the original state is used when implementing a user-defined operator.

Returning to our example, the broadcast variable is part of the OperatorState and is stored as a managed MapState. In the specific method is DefaultOperatorStateBackend getBroadcastState implementation.

StateDescriptor

You have to create a StateDescriptor to get the corresponding state handle. This saves the state name (you can create multiple states, and they must have unique names so you can refer to them), the type of value the state holds, and may contain user-specified functions, such as ReduceFunction. Depending on the state type, you can create ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, FoldingStateDescriptor or MapStateDescriptor.

State is accessed through RuntimeContext and therefore can only be used in Rich Functions.

OperatorStateBackEnd

OperatorStateBackEnd manages OperatorState. Currently, only one implementation: DefaultOperatorStateBackend.

DefaultOperatorStateBackend

DefaultOperatorStateBackend state stored in the Map manner. It constructs a PartitionableListState (belonging to the ListState). Operator States are kept in memory.

public class DefaultOperatorStateBackend implements OperatorStateBackend {
  
	/** * Map for all registered operator states. Maps state name -> state */
	private finalMap<String, PartitionableListState<? >> registeredOperatorStates;/** * Map for all registered operator broadcast states. Maps state name -> state */
	private finalMap<String, BackendWritableBroadcastState<? ,? >> registeredBroadcastStates;/**
	 * Cache of already accessed states.
	 *
	 * <p>In contrast to {@link #registeredOperatorStates} which may be repopulated
	 * with restored state, this map is always empty at the beginning.
	 *
	 * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
	 */
	private finalMap<String, PartitionableListState<? >> accessedStatesByName;private finalMap<String, BackendWritableBroadcastState<? ,? >> accessedBroadcastStatesByName;// This is used to cache broadcast variables
  
  // This is the API for accessing broadcast variables
	public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {

		String name = Preconditions.checkNotNull(stateDescriptor.getName());

    // If there was one before, take it out
		BackendWritableBroadcastState<K, V> previous =
			(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(
      name);

		if(previous ! =null) {
			return previous;
		}

		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
		TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
		TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());

		BackendWritableBroadcastState<K, V> broadcastState =
			(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);

		if (broadcastState == null) {
			broadcastState = new HeapBroadcastState<>(
					new RegisteredBroadcastStateBackendMetaInfo<>(
							name,
							OperatorStateHandle.Mode.BROADCAST,
							broadcastStateKeySerializer,
							broadcastStateValueSerializer));
			registeredBroadcastStates.put(name, broadcastState);
		} else {
			// has restored state; check compatibility of new state access

			RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();

			// check whether new serializers are incompatible
			TypeSerializerSchemaCompatibility<K> keyCompatibility =
				restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);

			TypeSerializerSchemaCompatibility<V> valueCompatibility =
				restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);

			broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
		}

		accessedBroadcastStatesByName.put(name, broadcastState); // If not, save it
		returnbroadcastState; }}Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF. Reference

Flink principle and implementation: the explanation of the Flink state management yq.aliyun.com/articles/22…

Flink using radio configuration dynamic update www.jianshu.com/p/c8c99f613…

Flink Broadcast State practical guide blog.csdn.net/u010942041/…

Talk about flink Broadcast State www.jianshu.com/p/d6576ae67…

Working with the State ci.apache.org/projects/fl…