This article is a reference from 42 talk easy clearance Flink (lagou.com) hook has many kinds of quality column, I small white a piece, most of the technology entry comes from hook and Git, from time to time in nuggets published their own summary

“II” advanced

2.2 Core concept analysis

2.2.1 Distributed Cache

One of the most common applications of distributed caching in our production environment is when a table is joined to a table. If one table is large and the other table is small, we can cache the smaller tables, save a copy in each TaskManager, and then Join the smaller tables

Examples:

public static void main(String[] args) throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/distributedcache.txt"."distributedCache");
       //1: registers a file. You can use HDFS files or local files for testing
       DataSource<String> data = env.fromElements("Linea"."Lineb"."Linec"."Lined");

       DataSet<String> result = data.map(new RichMapFunction<String, String>() {
           private ArrayList<String> dataList = new ArrayList<String>();

           @Override
           public void open(Configuration parameters) throws Exception {
               super.open(parameters);
               //2: use the cache file
               File myFile = getRuntimeContext().getDistributedCache().getFile("distributedCache");
               List<String> lines = FileUtils.readLines(myFile);
               for (String line : lines) {
                   this.dataList.add(line);
                   System.err.println("Distributed cache is :"+ line); }}@Override
           public String map(String value) throws Exception {
               // Here you can use dataList
               System.err.println("Use datalist:" + dataList + "-- -- -- -- -- -- --" +value);
               // Business logic
               return dataList +":"+ value; }}); result.printToErr(); }Copy the code

Results:


  1. Register a file in the env environment. The file can be from local or HDFS, and give the file a name
  2. With distributed caching, it can be obtained directly based on the registered name

In the preceding example, a local distributedCache. TXT file is registered as distributedCache, and the map operator reads and processes the cache file using this name

[Precautions] :

  • The files we cache had better be read-only for the duration of the task, otherwise there will be data consistency issues.
  • Do not cache files and data too large. Otherwise, Task execution speed may be affected or OOM may be generated in extreme cases.

2.2.2 Fault Recovery

Automatic fault recovery is a powerful function provided by Flink. In the actual operation environment, we will encounter various problems that lead to application suspension, such as illegal data, network jitter and so on.

Flink configuration file, one of the parameters of the jobmanager. Execution. Failover – strategy (failure recovery strategy)

  1. Full if a Task in the cluster is faulty, all tasks of the Task will restart

    • In fact, we may just have one or more tasks in the cluster fail and only need to restart the faulty part…
  2. Region ==> Region-based local restart policy. Under this strategy, Flink will divide our tasks into different regions. When a Task fails, Flink will calculate the minimum Region that needs to be recovered. Flink uses the following logic to determine the Region to be restarted:

    1. The Region where the Task is located needs to be restarted.
    2. If the dependent data of the current Region is damaged or partially lost, the Region that produces data needs to be restarted.
    3. To ensure data consistency, the downstream Region of the current Region needs to be restarted.

2.2.3 Restart Policy

Common restart policies:

  • Fixed delayed restart policy mode
  • Failure rate Restart policy mode
  • None Restart policy mode

Flink makes default conventions when deciding which restart strategy to use

  1. If checkpoint is configured but no restart policy is set, the system is restarted in fixed-delay mode.
  2. If checkpoint is not configured, the system does not restart by default.

2.2.3.1 No Restart policy

In this case, if something goes wrong with our job, the task simply exits. We can configure it in flink-conf.yaml:

restart-strategy: none
Copy the code

It can also be specified in a program using code:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Copy the code

2.2.3.2 Fixed delay Restart policy mode

restart-strategy: fixed-delay
Copy the code

In the fixed delay restart policy mode, you need to specify two other parameters. First, Flink retries based on the configured number of retries, and retries based on the configured interval between retries:

  1. restart-strategy.fixed-delay.attemptsNumber of retries required
  2. restart-strategy.fixed-delay.delayInterval of each retry

[Example] : The task needs to be retried three times, and the retry interval is 5 seconds

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 5 s
Copy the code

It can also be set in code:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
     3.// Number of restarts
     Time.of(5, TimeUnit.SECONDS) // Time interval
));
Copy the code

2.2.3.3 Failure Rate Restart policy mode

restart-strategy: failure-rate
Copy the code

This restart mode requires specifying three additional parameters. Failure rate The restart policy restarts after a Job fails. However, when the failure rate exceeds, the Job is considered to have failed. The restart policy waits a fixed amount of time between two consecutive restart attempts:

  1. restart-strategy.failure-rate.max-failures-per-intervalThe maximum number of failures within a specified time interval
  2. restart-strategy.failure-rate.failure-rate-intervalCalculate the time interval of failure rate
  3. restart-strategy.failure-rate.delayInterval of each retry

[Example] : If the task fails for three times within five minutes, the task is considered to have failed. The retry interval for each failure is five seconds

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 5 s
Copy the code

It can also be specified directly in code:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
     3.// The maximum number of failures per interval
     Time.of(5, TimeUnit.MINUTES), // Measure the time interval of failure rate
     Time.of(5, TimeUnit.SECONDS) // The interval between each task failure
));
Copy the code

[Precautions] In actual production environments, it is recommended to specify the retry mechanism and restart policy for each task because the load and resource consumption of each task are different

2.2.4 parallelism

Parallelism is one of the core concepts of Flink task execution, which is defined as how many sub-tasks can be divided into an operator task for parallel execution in distributed operation environment. By increasing the Parallelism of tasks, we can make them run much faster.


In general, we can set the parallelism of tasks at four levels:

  1. The setParallelism method can be called in code to set the degree of parallelism for each operator. Such as:

    DataSet<Tuple2<String, Integer>> counts =
          text.flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1).setParallelism(1);
    Copy the code

    In fact, each of Flink’s operators can set the degree of parallelism individually. This is the most recommended way to tune tasks for each operator

  2. The env.setParallelism() method can be used to set the parallelism of the current execution environment. This configuration will apply to all operators, sources, and sinks of the current task. Of course you can override this setting by setting parallelism at the operator level:

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(5);
    Copy the code
  3. Submit Task Level When submitting a task, the user can specify the -p parameter to set the task parallelism. For example:

    ./bin/flink run -p 10 WordCount.jar
    Copy the code
  4. One configuration in Flink-conf. yaml is parallelism. Default, which sets the parallelism configuration for all execution environments at the system level.

Execution priority: Operator level > Execution environment level > Submitted Task level > System Configuration level

2.3 Window & Time & Watermark

Three Windows: scroll window, sliding window, session window

Three time: event time, ingestion time, processing time

The concept of “time” in Flink is mainly introduced below:

2.3.1 time? o?

2.3.1.1 Event time

Event Time refers to the Time when the data is generated. This Time is carried by the data producer. For example, in Kafka messages, each generated message has a timestamp representing the Time when the data is generated. The Event Time is created from the moment the message is generated. It does not change and is the Time we use most frequently.

To use Event Time, you need to specify how to generate Event Time “watermarks” and generally use them with Windows

We can specify the time type used by the Flink system as EventTime in our code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the time attribute to EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1)) .reduce( (a, b) -> a.add(b) ) .addSink(...) ;Copy the code

Flink registered EventTime is through InternalTimerServiceImpl registerEventTimeTimer implemented:

	@Override
	public void registerEventTimeTimer(N namespace, long time) {
		eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
	}
Copy the code

This method takes two inputs: namespace and time, where:

  • timeIs the time when the timer is triggered
  • namespaceIs constructed as a TimerHeapInternalTimer object, and then put it in the KeyGroupedInternalPriorityQueue queue

When will Flink use these timers to trigger calculations? * * the answer in InternalTimeServiceImpl advanceWatermark this method:

public void advanceWatermark(long time) throws Exception {
   currentWatermark = time;

   InternalTimer<K, N> timer;

   while((timer = eventTimeTimersQueue.peek()) ! =null&& timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); }}Copy the code

The method of the while loop will in turn to take out the trigger time is less than the parameters from the eventTimeTimersQueue time all of the timer, call triggerTarget. OnEventTime triggered () method

2.3.1.2 Intake time

Ingestion Time is the Time when events enter the Flink system. In The Source of Flink, each event will use the current Time as a timestamp and subsequent window processing will be based on this Time. Ingestion Time is theoretically between Event Time and Processing Time.

Compared to event time, ingestion time can’t handle delayed and out-of-order situations, but it doesn’t need to explicitly perform how to generate watermark. Inside the system, ingestion time is handled more like event time, but with an automatically generated timestamp and automatic watermark.

It can prevent Flink from processing data out of order, but it cannot solve the problem of data out of order before arriving at Flink. If you need to handle such issues, EventTime is recommended.

Ingestion of Time Time type generate the relevant code in AutomaticWatermarkContext:


A watermark send timer will be set and will be triggered after the watermarkInterval.

The code to process the data is in the processAndCollect() method:

		@Override
		protected void processAndCollect(T element) {
			lastRecordTime = this.timeService.getCurrentProcessingTime();
			output.collect(reuse.replace(element, lastRecordTime));

			// this is to avoid lock contention in the lockingObject by
			// sending the watermark before the firing of the watermark
			// emission task.
			if (lastRecordTime > nextWatermarkTime) {
				// in case we jumped some watermarks, recompute the next watermark time
				final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval);
				nextWatermarkTime = watermarkTime + watermarkInterval;
				output.emitWatermark(new Watermark(watermarkTime));

				// we do not need to register another timer here
				// because the emitting task will do so.}}Copy the code

2.3.1.3 Processing time

Processing Time refers to the system Time of the machine when data is processed by Flink framework. Processing Time is the simplest concept in Flink’s Time system, but this Time has certain uncertainties, such as delay of message arrival at Processing node and other influences.

We can also specify the Processing Time used by the Flink system in our code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Copy the code

Also, can also be found in the source Flink is how to register and use the Processing Time of InternalTimerServiceImpl. RegisterProcessingTimeTimer

@Override
public void registerProcessingTimeTimer(N namespace, long time) {
   InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
   if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
      longnextTriggerTime = oldHead ! =null ? oldHead.getTimestamp() : Long.MAX_VALUE;
      // check if we need to re-schedule our timer to earlier
      if (time < nextTriggerTime) {
         if(nextTimer ! =null) {
            nextTimer.cancel(false);
         }
         nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime); }}}Copy the code

Whenever a new Timer is added to processingTimeTimersQueue the priority queue, if the new Timer timestamp smaller, The smaller the Timer will be register ScheduledThreadPoolExecutor actuators on regularly

Processing Time is triggered is in InternalTimeServiceImpl onProcessingTime () method:

	private void onProcessingTime(long time) throws Exception {
		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
		// inside the callback.
		nextTimer = null;

		InternalTimer<K, N> timer;

		while((timer = processingTimeTimersQueue.peek()) ! =null && timer.getTimestamp() <= time) {
			processingTimeTimersQueue.poll();
			keyContext.setCurrentKey(timer.getKey());
			triggerTarget.onProcessingTime(timer);
		}

		if(timer ! =null && nextTimer == null) {
			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime); }}Copy the code

Loop through all timers with a time less than the input time and run the onProcessingTime() method of triggerTarget

2.3.2 watermark

WaterMark is one of the most obscure concepts in the Flink framework, in large part because of translation.

WaterMark is a water level in normal English translation, but in the Flink framework it makes more sense to say “WaterMark,” which is essentially a time stamp


According to section 2.3.1, we can know:

  • EventTime Each piece of data carries a timestamp;
  • ProcessingTime data does not carry any timestamp information;
  • IngestionTime is similar to EventTime, but the difference is that Flink uses the system time as the timestamp to bind to each data, which can prevent internal processing of Flink data from being out of order, but can not solve the problem of data being out of order before arriving at Flink.

Thus –> We use EventTime and WaterMark to handle out-of-order messages

2.3.2.1 Nature of watermark

Watermark is a time-stamped element in DataStream to solve the problem of out-of-order data in real-time computing. If a WaterMark appears in the Flink system, it means that all EventTime < T data has arrived, and the window with the same end time as T is triggered for calculation.

In other words, the watermark is Flink’s standard for judging late data and is also the mark triggered by the window.

When the program parallelism is greater than 1, there will be multiple streams producing watermarks and Windows, and Flink will select the watermark with the smallest timestamp.

2.3.2.2 How is watermark generated

Provides assignTimestampsAndWatermarks Flink () method to implement the watermark extraction and specified, The method takes the reference the AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks two kinds

2.3.2.3 Types of watermarks

Periodic watermark

When we generated watermark using AssignerWithPeriodicWatermarks cycle, cycle time of the default is 200 ms, specify the location of the time are as follows:

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        getConfig().setAutoWatermarkInterval(0);
    } else {
        getConfig().setAutoWatermarkInterval(200); }}Copy the code
  • Remember when we were talking about time typesenv.setStreamTimeCharacteristic()Methods specified Flink system type, the setStreamTimeCharacteristic () method to do judgment, if the user of the incoming is TimeCharacteristic eventTime type, The AutoWatermarkInterval is 200ms
  • We can use it nowExecutionConfig.setAutoWatermarkInterval()Method to specify the interval for automatic generation.

As you can see in the 2.3.2.2 class diagram, we need the TimestampAssigner#extractTimestamp() method to extract EventTime

Flink provides three ways to extract EventTime() here:

  1. AscendingTimestampExtractor
  2. BoundedOutOfOrdernessTimestampExtractor)
  3. IngestionTimeExtractor

BoundedOutOfOrdernessTimestampExtractor () to use most, special attention should be paid to maxOutOfOrderness parameters in this method, the parameter refers to allow data to order the time range. In short, this approach allows data to be delayed by maxOutOfOrderness for such a long time.

 public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
     if (maxOutOfOrderness.toMilliseconds() < 0) {
         throw new RuntimeException("Tried to set the maximum allowed " +
             "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
     }
     this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
     this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
 }

 public abstract long extractTimestamp(T element);

 @Override
 public final Watermark getCurrentWatermark(a) {
     long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
     if (potentialWM >= lastEmittedWatermark) {
         lastEmittedWatermark = potentialWM;
     }
     return new Watermark(lastEmittedWatermark);
 }

 @Override
 public final long extractTimestamp(T element, long previousElementTimestamp) {
     long timestamp = extractTimestamp(element);
     if (timestamp > currentMaxTimestamp) {
         currentMaxTimestamp = timestamp;
     }
     return timestamp;
 }
Copy the code
PunctuatedWatermark watermark

Flink does not provide built-in implementation of this kind of watermarking generation method, it is suitable for judging whether to generate watermarking according to the received message, with this kind of watermarking generation method is rare

For a simple example, if we find that the received data MyData begins with the string watermark, a watermark is generated:

data.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {

      @Override
      public Watermark checkAndGetNextWatermark(MyData data, long l) {
        return data.getRecord.startsWith("watermark")?new Watermark(l) : null;
      }

      @Override
      public long extractTimestamp(MyData data, long l) {
        returndata.getTimestamp(); }});class MyData{
    private String record;
    private Long timestamp;
    public String getRecord(a) {
        return record;
    }
    public void setRecord(String record) {
        this.record = record;
    }
    public Timestamp getTimestamp(a) {
        return timestamp;
    }
    public void setTimestamp(Timestamp timestamp) {
        this.timestamp = timestamp; }}Copy the code

2.3.2.4 case

“Goal” to simulate a real-time receiving Socket DataStream procedures, the code used in AssignerWithPeriodicWatermarks to set the watermark, converts the received data, Group and retrieve the smallest data for the second element in a 5-second window

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    // Set it to eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    // Set the watermark generation interval to 100ms
    env.getConfig().setAutoWatermarkInterval(100);

    DataStream<String> dataStream = env
            .socketTextStream("127.0.0.1".9000)
            .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
                private Long currentTimeStamp = 0L;
                // Set the allowed out-of-order time
                private Long maxOutOfOrderness = 5000L;

                @Override
                public Watermark getCurrentWatermark(a) {

                    return new Watermark(currentTimeStamp - maxOutOfOrderness);
                }

                @Override
                public long extractTimestamp(String s, long l) {
                    String[] arr = s.split(",");
                    long timeStamp = Long.parseLong(arr[1]);
                    currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                    System.err.println(s + ",EventTime:" + timeStamp + ",watermark:" + (currentTimeStamp - maxOutOfOrderness));
                    returntimeStamp; }}); dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> map(String s) throws Exception {

            String[] split = s.split(",");
            return new Tuple2<String, Long>(split[0], Long.parseLong(split[1]));
        }
    })
            .keyBy(0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .minBy(1)
            .print();

    env.execute("WaterMark Test Demo");

}
Copy the code
The first test

The test data are as follows:

flink,1588659181000
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
Copy the code

A simple judgment can be made that the time stamp of the first data is 1588659181000 and the window size is 5 seconds, so the window calculation should be triggered when flink,1588659185000 appears.

We start the port with the NC-LK 9000 command, then output the above test data, and see the console output:

Obviously, you can see that the window triggers the calculation when the fifth data is present

Second experiment

To simulate the out-of-order situation again, assume that our data source is as follows:

flink,1588659181000
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
flink,1588659180000
flink,1588659186000
flink,1588659187000
flink,1588659188000
flink,1588659189000
flink,1588659190000
Copy the code

The running data is as follows:

As you can see, the message with the timestamp 1588659180000 is not processed. In this case, the allowed out-of-order time in the code private Long maxOutOfOrderness = 0L, which means that out-of-order messages are not processed

Change private Long maxOutOfOrderness to 5000L, which means that messages can be out of order for 5 seconds. Then, send the same data to socket ports

As you can see, sending all the data triggered only one window calculation, and the watermark time in the output was delayed by 5 seconds. Therefore, maxOutOfOrderness Settings can affect the computing time of Windows and the watermarking time, as shown in the following figure:

If we continue to send data to the socket:

flink,1588659191000
flink,1588659192000
flink,1588659193000
flink,1588659194000
flink,1588659195000
Copy the code

You can see when the next window will trigger:

[Notes] Flink uses time + window + watermark to solve the data disorder problem in actual production, which has the following trigger conditions:

  1. Watermark time >= window_end_time;
  2. There is data in [window_start_time,window_end_time], which is left closed and right open.

In addition, because WaterMark is generated in the form of objects and sent downstream, it also consumes memory, so the time and frequency of WaterMark generation should be strictly controlled, otherwise it will affect our normal operation

2.4 State & Fault tolerance

In Flink’s framework, stateful computation is one of the most important features of Flink. The so-called state, in fact, refers to the intermediate calculation results of Flink program. Flink supports different types of state and has a dedicated mechanism and state manager for state persistence.

Against 2.4.1 states

We found this quote in Flink’s official blog, which can be interpreted as a definition of status:

When working with state, it might also be useful to read about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic.


State refers to the data that needs to be remembered during flow processing, and this data can include either business data or metadata. Flink itself provides different state managers to manage state, and the state can be very large

Flink’s state data can reside either in the JVM’s heap or off-heap memory, or in third-party storage such as the RocksDB support that Flink has implemented. Flink’s official website also provides several cases applicable to state calculation:

  1. When an application searches for certain event patterns, the state will store the sequence of events encountered so far

    Complex event processing gets events that conform to a specific time rule

  2. When aggregating events per minute/hour/day, the state holds the pending aggregates

    Aggregate calculation

  3. When training a machine learning model over a stream of data points, the state holds the current version of the model parameters

    Model training for machine learning

  4. When historic data needs to be managed, the state allows efficient access to events that occurred in the past

    Use historical data for calculations

2.4.2 Status Classification & Usage

The concept of KeyedStream and the use of the operator KeyBy were mentioned in the previous class.

In Flink, states are divided into Keyed State and Operator State (non-keyed State) according to whether data sets are partitioned according to a certain Key.

As shown in the figure above, Keyed State is the State on the stream after partitioning, and each Key has its own State. The octagon, circle and triangle in the figure manage their State respectively, and only the specified Key can access and update its corresponding State.

Unlike Keyed State, Operator State can be applied to all operators. Each Operator subtask or instance of an Operator shares a State that can be accessed and updated by data flowing into the Operator subtask. The data on each operator subtask shares its own state.

It should be noted that no matter Keyed State or Operator State, Flink’s State is local, that is, each Operator subtask maintains the corresponding State storage of this Operator subtask, and states between Operator subtasks cannot access each other.

For Keyed State, Flink provides several ready-to-use data structures:

  • ValueState
  • MapState
  • AppendingState
    • ReducingState
    • AggregatingState
    • ListState
  • ReadOnlyBrodcastState

How do we access these states? Flink provides the state desciptor method specifically for accessing different states. The class diagram looks like this:

StateDesciptor and ValueState

public static void main(String[] args) throws Exception {

   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

   env.fromElements(Tuple2.of(1L.3L), Tuple2.of(1L.5L), Tuple2.of(1L.7L), Tuple2.of(1L.5L), Tuple2.of(1L.2L))
         .keyBy(0)
         .flatMap(new CountWindowAverage())
         .printToErr();

       env.execute("submit job");

}


   public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long.Long>, Tuple2<Long.Long>> {

       private transient ValueState<Tuple2<Long, Long>> sum;
       public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

           Tuple2<Long, Long> currentSum;
           / / access ValueState
           if(sum.value()==null){
               currentSum = Tuple2.of(0L.0L);
           }else {
               currentSum = sum.value();
           }

           / / update
           currentSum.f0 += 1;

           // Add 1 to the second element
           currentSum.f1 += input.f1;

           / / update the state
           sum.update(currentSum);

           // If count is greater than or equal to 2, find and clear state
           if (currentSum.f0 >= 2) {
               out.collect(newTuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); }}public void open(Configuration config) {
       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
               new ValueStateDescriptor<>(
                       "average".// State's name
                       TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                       ); // Set the default value


       StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.seconds(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); descriptor.enableTimeToLive(ttlConfig); sum = getRuntimeContext().getState(descriptor); }}Copy the code
  • State is accessed by inheriting RichFlatMapFunction

  • Get a handle to the state by getRuntimeContext().getState(Descriptor)

    The actual access and updating of the status is done in the Map function.

  • The output condition here is that, whenever the sum of the first element reaches two, the sum of the second element is divided by the sum of the first element, and the final output is:

    The actual application scenarios of Operator State are not as common as Keyed State. Generally, it is used in operators such as Source or Sink to save the offset of incoming data or cache the output data to ensure the Exactly-Once semantics of Flink application

Also, we can set the expiration time for any state data. If a state is TTL and has expired, the values we saved are cleaned up.

To use TTL, we need to first build a StateTtlConfig configuration object; You can then enable TTL functionality in any state descriptor through the delivery configuration.

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(10))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();

descriptor.enableTimeToLive(ttlConfig);
Copy the code

There are a few configurations in the StateTtlConfig class that need our attention:

UpdateType indicates when the expiration time is updated, and for those expired states, whether they are still accessible depends on the configuration of StateVisibility

2.4.3 Status Back-end Types & Configurations

By default, Flink’s state is stored in TaskManager’s memory, and Flink provides three available state backends for saving state backends in different situations:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

2.4.3.1 MemoryStateBackend

MemoryStateBackend stores state data in memory and is used for local debugging. Note the following points when using MemoryStateBackend:

  • Each individual state is limited to 5MB by default and can be increased by constructors
  • The size of a state cannot exceed akka’s Framesize size
  • The aggregated state must be able to fit into JobManager’s memory

MemoryStateBackend can be specified by displaying it in code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false));
Copy the code

False in New MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false) indicates that the asynchronous snapshot mechanism is disabled

2.4.3.2 FsStateBackend

Store the state data in TaskManager memory. At CheckPoint, a status snapshot is written to the configured file system directory and a small amount of metadata is stored in the JobManager memory.

FsStateBackend stores status in an external system, such as HDFS. Therefore, FsStateBackend is applicable to large jobs, large status, and globally high availability tasks

Using FsStateBackend requires us to specify a file path, generally speaking is the path of the HDFS, for example, HDFS: / / the namenode: 40010 / flink/checkpoints

We can also display the specified in code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints".false));
Copy the code

2.4.3.3 RocksDBStateBackend

Similar to FsStateBackend, both require an external file storage path and are suitable for large jobs, large status, and globally high availability tasks

RocksDBStateBackend stores running status data in the RocksDB database. By default, RocksDB stores data in the data directory of the node where TaskManager runs

This means that RocksDBStateBackend can store far more state than FsStateBackend can, avoiding the OOM consequences of FsStateBackend’s state surge. However, by storing state data in the RocksDB database, Throughput will be reduced.

! RocksDBStateBackend is the only state backend that supports incremental snapshots

2.5 Side OutPut shunt

In production practice, it is common to encounter scenarios where input sources need to be split as needed, such as “order flows are expected to be split by dollar size”, “user access logs are split by geographic location of visitors”, etc

In order to split the input source, the bypass shunt came into being!

2.5.1 Diversion mode

2.5.1.1 Filter bypass

In section 1.4.2.4 we mentioned the Filter operator – filters according to the criteria entered by the user. Each element is processed by the Filter () function. If the Filter () function returns true, it is retained, otherwise it is discarded

In the case of streaming, we can make multiple filters to generate different streams for different data we need

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Get the data source
    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
    data.add(new Tuple3<>(0.1.0));
    data.add(new Tuple3<>(0.1.1));
    data.add(new Tuple3<>(0.2.2));
    data.add(new Tuple3<>(0.1.3));
    data.add(new Tuple3<>(1.2.5));
    data.add(new Tuple3<>(1.2.9));
    data.add(new Tuple3<>(1.2.11));
    data.add(new Tuple3<>(1.2.13));


    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);

    SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
    SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);

    zeroStream.print();
    oneStream.printToErr();


    // Print the result
    String jobName = "user defined streaming source";
    env.execute(jobName);
}
Copy the code

In the above example we split the original stream using the filter operator. The input data whose first element is 0 and the first element is 1 are written to zeroStream and oneStream respectively, and the two streams are then printed

The disadvantage of Filter is obvious. In order to get the stream data we need, we need to traverse the original stream for many times, which wastes resources of our cluster virtually

2.5.1.2 Split shunt

Define an OutputSelector in a split operator, then override its select method to mark different types of data, and finally select the corresponding data from the returned SplitStream using the select method.

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Get the data source
    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
    data.add(new Tuple3<>(0.1.0));
    data.add(new Tuple3<>(0.1.1));
    data.add(new Tuple3<>(0.2.2));
    data.add(new Tuple3<>(0.1.3));
    data.add(new Tuple3<>(1.2.5));
    data.add(new Tuple3<>(1.2.9));
    data.add(new Tuple3<>(1.2.11));
    data.add(new Tuple3<>(1.2.13));


    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);


    SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
        @Override
        public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
            List<String> tags = new ArrayList<>();
            if (value.f0 == 0) {
                tags.add("zeroStream");
            } else if (value.f0 == 1) {
                tags.add("oneStream");
            }
            returntags; }}); splitStream.select("zeroStream").print();
    splitStream.select("oneStream").printToErr();

    // Print the result
    String jobName = "user defined streaming source";
    env.execute(jobName);
}
Copy the code

The results are as follows:

If the zeroStream and oneStream streams that have been split are called split again, the console will throw the following exception

Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
Copy the code

This method has been deprecated and it is recommended to use the latest SideOutPut for shunt operation

2.5.1.3 SideOutPut Shunt √

Flink framework provides us with the latest and most recommended shunt method. When using SideOutPut, we need to follow the following steps:

  • Define the OutputTag
  • Call a specific function to split the data
    • ProcessFunction
    • KeyedProcessFunction
    • CoProcessFunction
    • KeyedCoProcessFunction
    • ProcessWindowFunction
    • ProcessAllWindowFunction
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Get the data source
    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
    data.add(new Tuple3<>(0.1.0));
    data.add(new Tuple3<>(0.1.1));
    data.add(new Tuple3<>(0.2.2));
    data.add(new Tuple3<>(0.1.3));
    data.add(new Tuple3<>(1.2.5));
    data.add(new Tuple3<>(1.2.9));
    data.add(new Tuple3<>(1.2.11));
    data.add(new Tuple3<>(1.2.13));


    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);

    OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
    OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};


    SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
        @Override
        public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {

            if (value.f0 == 0) {
                ctx.output(zeroStream, value);
            } else if (value.f0 == 1) { ctx.output(oneStream, value); }}}); DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream); DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream); zeroSideOutput.print(); oneSideOutput.printToErr();// Print the result
    String jobName = "user defined streaming source";
    env.execute(jobName);
}
Copy the code

As you can see, we split the flow and successfully printed the results

2.6 CEP Complex Event Processing

Complex Event Processing (CEP) is one of Flink’s most interesting features. For an explanation of CEP, we quote from Wikipedia:

CEP, is event processing that combines data from multiple sources to infer events or patterns that suggest more complicated circumstances. The goal of complex event processing is to identify meaningful events (such as opportunities or threats) and respond to them as quickly as possible.


CEP is an event processing process that combines multiple data sources to “infer events & patterns” and make guesses for more complex scenarios. CEP is designed to “identify and quickly respond to high-value events” (e.g., opportunities, threats…).

[for example]

  • Find those fake trades in a lot of orders
  • Search your site’s access logs for users who “pop” logins with scripts or tools
  • Packages that have been stranded for a long time have been found in express shipments
  • .

2.6.1 Program structure

  1. Define pattern
  2. Matching results

Official case:

DataStream<Event> input = ... Pattern<Event, ? > pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end"); }}); PatternStream<Event> patternStream = CEP.pattern(input, pattern); DataStream<Alert> result = patternStream.process(new PatternProcessFunction<Event, Alert>() {
        @Override
        public void processMatch( Map
       
        > pattern, Context ctx, Collector
        
          out)
        
       ,> throws Exception { out.collect(createAlertFrom(pattern)); }});Copy the code

The program structure of this case is as follows:

  1. The first step is to define a Pattern. Here we define a Pattern that matches all received events with id equal to 42, then matches events with volume greater than 10.0, and then matches an event with name equal to end.
  2. The second step is to match the pattern and send an alarm. According to the defined pattern, the input stream will be matched. Once the pattern is matched, an alarm will be sent.

2.6.2 Mode Definition

Flink supports a very rich definition of patterns that are the basis for our complex business logic. We have simply categorized the supported schemas into the following categories, and complete schema definition API support can be found on the official website.

2.6.3 Source code parsing

The whole process of Flink CEP is:

  1. Input from a Source
  2. The Pattern operator converts to PatternStream
  3. afterselect/processThe operator is converted to DataStream

In the PatternStream class, observe the select()/process() method:

    public <R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
        PatternProcessFunction<T, R> processFunction = PatternProcessFunctionBuilder.fromSelect((PatternSelectFunction)this.builder.clean(patternSelectFunction)).build();
        return this.process(processFunction, outTypeInfo); }...public <R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
        PatternProcessFunction<T, R> processFunction = PatternProcessFunctionBuilder.fromSelect((PatternSelectFunction)this.builder.clean(patternSelectFunction)).build();
        return this.process(processFunction, outTypeInfo);
    }
Copy the code

It both eventually be through PatternStreamBuilder# build () generates a SingleOutputStreamOperator (this class inherits from DataStream)

<OUT, K> SingleOutputStreamOperator<OUT> build(TypeInformation<OUT> outTypeInfo, PatternProcessFunction<IN, OUT> processFunction) {
    Preconditions.checkNotNull(outTypeInfo);
    Preconditions.checkNotNull(processFunction);
    TypeSerializer<IN> inputSerializer = this.inputStream.getType().createSerializer(this.inputStream.getExecutionConfig());
    boolean isProcessingTime = this.inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
    boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
    NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(this.pattern, timeoutHandling);
    
    // >>>>> finally encapsulates the logical calculations to be processed in the following CepOperator <<<<<
    CepOperator<IN, K, OUT> operator = new CepOperator(inputSerializer, isProcessingTime, nfaFactory, this.comparator, this.pattern.getAfterMatchSkipStrategy(), processFunction, this.lateDataOutputTag);
    
    SingleOutputStreamOperator patternStream;
    if (this.inputStream instanceof KeyedStream) {
        KeyedStream<IN, K> keyedStream = (KeyedStream)this.inputStream;
        patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
    } else {
        KeySelector<IN, Byte> keySelector = new NullByteKeySelector();
        patternStream = this.inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();
    }

    return patternStream;
}
Copy the code

The processElement method in CepOperator is the logical processing of each piece of data

    public void processElement(StreamRecord<IN> element) throws Exception {
        long currentTime;
        if (this.isProcessingTime) {
            if (this.comparator == null) {
                NFAState nfaState = this.getNFAState();
                long timestamp = this.getProcessingTimeService().getCurrentProcessingTime();
                this.advanceTime(nfaState, timestamp);
                this.processEvent(nfaState, element.getValue(), timestamp);
                this.updateNFA(nfaState);
            } else {
                currentTime = this.timerService.currentProcessingTime();
                this.bufferEvent(element.getValue(), currentTime);
                this.timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1L); }}else {
            currentTime = element.getTimestamp();
            IN value = element.getValue();
            if (currentTime > this.lastWatermark) {
                this.saveRegisterWatermarkTimer();
                this.bufferEvent(value, currentTime);
            } else if (this.lateDataOutputTag ! =null) {
                this.output.collect(this.lateDataOutputTag, element); }}}Copy the code

Also, because CepOperator implements the Triggerable interface, timers are executed. All of the core processing logic is in the updateNFA method

    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();

        NFAState nfaState;
        long timestamp;
        for(nfaState = this.getNFAState(); ! sortedTimestamps.isEmpty() && (Long)sortedTimestamps.peek() <=this.timerService.currentWatermark(); this.elementQueueState.remove(timestamp)) {
            timestamp = (Long)sortedTimestamps.poll();
            this.advanceTime(nfaState, timestamp);
            Stream<IN> elements = this.sort((Collection)this.elementQueueState.get(timestamp));
            Throwable var7 = null;

            try {
                elements.forEachOrdered((event) -> {
                    try {
                        this.processEvent(nfaState, event, timestamp);
                    } catch (Exception var6) {
                        throw newRuntimeException(var6); }}); }catch (Throwable var16) {
                var7 = var16;
                throw var16;
            } finally {
                if(elements ! =null) {
                    if(var7 ! =null) {
                        try {
                            elements.close();
                        } catch(Throwable var15) { var7.addSuppressed(var15); }}else{ elements.close(); }}}}this.advanceTime(nfaState, this.timerService.currentWatermark());
        this.updateNFA(nfaState); // <--
        if(! sortedTimestamps.isEmpty() || !this.partialMatches.isEmpty()) {
            this.saveRegisterWatermarkTimer();
        }

        this.updateLastSeenWatermark(this.timerService.currentWatermark()); }...private void updateNFA(NFAState nfaState) throws IOException {
        if (nfaState.isStateChanged()) {
            nfaState.resetStateChanged();
            this.computationStates.update(nfaState); }}Copy the code

2.7 Common Source & Connector

2.7.1 File-based

We can easily read data from a local file when testing in a local environment:

readTextFile(path)
readFile(fileInputFormat, path)
...
Copy the code

Can be found directly in the ExecutionEnvironment and StreamExecutionEnvironment Flink support way to read a local file, as shown in the figure below:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
	                       .types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
	                       .types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name"."age"."zipcode");
Copy the code

Based on the Collections 2.7.2

We can also create our own Source based on in-memory collections, objects, and so on. Generally used for local debugging or validation.

Such as:

fromCollection(Collection)
fromElements(T ...)
Copy the code

See the methods Flink supports in the source code, as shown below:

DataSet<String> text = env.fromElements(
      "Flink Spark Storm"."Flink Flink Flink"."Spark Spark Spark"."Storm Storm Storm"
);
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0.1.0));
data.add(new Tuple3<>(0.1.1));
data.add(new Tuple3<>(0.2.2));
data.add(new Tuple3<>(0.1.3));
data.add(new Tuple3<>(1.2.5));
data.add(new Tuple3<>(1.2.9));
data.add(new Tuple3<>(1.2.11));
data.add(new Tuple3<>(1.2.13));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
Copy the code

2.7.3 based on Socket

By listening on Socket ports, we can easily simulate a real-time computing environment locally.

StreamExecutionEnvironment socketTextStream method was provided by the host and the port from a Socket read data in the form of text.

DataStream<String> text = env.socketTextStream("127.0.0.1".9000."\n");
Copy the code

2.7.4 Customizing Source information

Refer to section 1.4.2.1

2.7.5 Delivered with connectors

Flink supports a wide variety of connectors for connecting to third parties. You can find a wide variety of Flink connectors on the official website:

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

Note that we often need to reference the corresponding Jar package dependencies when using these connectors. Note that some connectors, such as Kafka, have version requirements. Be sure to go to the official website to find the corresponding dependency version.

2.7.6 Connectors Published based on Apache Bahir

Flink will also release some connectors, such as Redis, based on Apache Bahir.

The code for Apache Bahir was originally extracted from the Apache Spark project and later provided as a separate project. Apache Bahir initially extended the reach of the analytics platform by providing a variety of Streaming Connectors and SQL data sources. It is also currently available for Apache Flink, with potential extensions for Apache Beam and more platforms.

We can find the currently supported Flink connectors on Bahir’s home page:

  • Flink streaming connector for ActiveMQ
  • Flink streaming connector for Akka
  • Flink streaming connector for Flume
  • Flink streaming connector for InfluxDB
  • Flink streaming connector for Kudu
  • Flink streaming connector for Redis
  • Flink streaming connector for Netty

Among them is the Redis connector we are most familiar with:

Local single-machine mode:

public static class RedisExampleMapper implements RedisMapper<Tuple2<String.String>>{
    @Override
    public RedisCommandDescription getCommandDescription(a) {
        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
    }
    @Override
    public String getKeyFromData(Tuple2<String, String> data) {
        return data.f0;
    }
    @Override
    public String getValueFromData(Tuple2<String, String> data) {
        return data.f1;
    }
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); DataStream<String> stream = ... ; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
Copy the code

Cluster mode:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); DataStream<String> stream = ... ; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
Copy the code

Sentinel mode:

FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
    .setMasterName("master").setSentinels(...) .build(); DataStream<String> stream = ... ; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
Copy the code

2.7.7 Connector based on asynchronous I/O and queriable status

Asynchronous I/O and queriable state are both very low-level ways that Flink provides to interact with external systems.

Asynchronous I/O is designed to solve the problem of Flink’s delay in accessing external storage in real-time computing. If we use MapFunction in the traditional way, all access to the external system is synchronous. In many cases, computing performance is constrained by the response speed of external systems, and long waits can result in low overall throughput.

We can use asynchronous I/O by inheriting RichAsyncFunction:

/** * implements 'AsyncFunction' for sending requests and setting callbacks */
class AsyncDatabaseRequest extends RichAsyncFunction<String.Tuple2<String.String>> {
    /** A database client that can use callback functions to send requests concurrently */
    private transient DatabaseClient client;
    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }
    @Override
    public void close(a) throws Exception {
        client.close();
    }
    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
        // Send an asynchronous request to receive the future result
        final Future<String> result = client.query(key);
        // Sets the callback function to execute after the client completes the request
        // The callback simply sends the result to the Future
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get(a) {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Handle exceptions explicitly
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(newTuple2<>(key, dbResult))); }); }}// Create initial DataStreamDataStream<String> stream = ... ;// Apply asynchronous I/O conversion operations
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
Copy the code

The ResultFuture’s complete method is asynchronous and does not wait for a return