This article is a reference from 42 talk easy clearance Flink (lagou.com) hook has many kinds of quality column, I small white a piece, most of the technology entry comes from hook and Git, from time to time in nuggets published their own summary

“III” generation practice

3.1 HA Configuration in the Production Environment

HA is commonly used to describe a system that has been specifically designed to reduce downtime while maintaining high availability of its services.

As mentioned earlier, JobManager is the cluster manager in the Flink cluster. It is responsible for scheduling tasks, CheckPoint coordination, fault recovery coordination, Job status collection, and secondary node management in the Flink cluster

By default, we have only one instance of JobManager per cluster, and if the JobManager crashes, our jobs will fail and we will not be able to submit new tasks.

So how should our clusters be configured for high availability in a production environment? We need different configurations for clusters that are deployed in different modes.

3.1.1 Source code analysis

Components such as JobManager and WebServer in Flink require high availability guarantee, and Flink also needs to keep Checkpoint metadata persistent. Flink HA related class diagram as shown below, we follow the source code to take a brief look at the implementation of Flink HA.

There are three HighAvailabilityMode enumerations defined in the HighAvailabilityMode class, as shown in the figure below:

  • NONE: indicates the non-HA mode
  • ZOOKEEPER: Implements HA based on ZK
  • For: custom HA factory class, the class needs to implement HighAvailabilityServicesFactory interface

High availability of the specific instance objects created in HighAvailabilityServicesUtils class to have reflected, as shown in the figure below:

To create an instance of HighAvailabilityServices, do the following:

public static HighAvailabilityServices createHighAvailabilityServices( Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception {
		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
		switch (highAvailabilityMode) {
			case NONE:
				// Omit some code
				// Returns a non-HA service class instance
				return new StandaloneHaServices(
					resourceManagerRpcUrl,
					dispatcherRpcUrl,
					jobManagerRpcUrl,
					String.format("%s%s:%s", protocol, address, port));
			case ZOOKEEPER:
				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
				
				// Return the ZK HA service class instance
				return new ZooKeeperHaServices(
					ZooKeeperUtils.startCuratorFramework(configuration),
					executor,
					configuration,
					blobStoreService);
			case FACTORY_CLASS:
				// Returns the custom HA service class instance
				return createCustomHAServices(configuration, executor);
			default:
				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); }}Copy the code

The HighAvailabilityServices interface defines the methods that HA service classes should implement

  • StandaloneHaServices (non-HA)

  • ZooKeeperHaServices mainly provides methods for creating LeaderRetrievalService and LeaderElectionService, as well as the NAME of the ZK node used by each service component

    • Including ZooKeeperLeaderElectionService implements LeaderElectionService acquiring leader election and leader in the method

      public interface LeaderElectionService {
      	// Start the leader election service
      	void start(LeaderContender contender) throws Exception;
      	// Stop leader election service
      	void stop(a) throws Exception;
      	// Get the new Leader session ID
      	void confirmLeaderSessionID(UUID leaderSessionID);
      	// Whether the leader is available
      	boolean hasLeadership(@Nonnull UUID leaderSessionId);
      }
      Copy the code
  • YarnHighAvailabilityServices.

3.1.2 Standalone Cluster High availability configuration

A little…

3.1.3 Yarn Cluster Ha Configuration

The high availability configuration of Flink on Yarn requires only one JobManager. When JobManager fails, Yarn is responsible for restarting it

  1. Modify the configuration in yarn-site. XML

    <property>
      <! -- Maximum number of retries for Yarn application Master -->
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>4</value>
      <description>
        The maximum number of application master execution attempts.
      </description>
    </property>
    Copy the code
  2. Set the maximum number of retries in flink-conf.yaml

    The default value is 2
    yarn.application-attempts: 10
    Copy the code
    • The above configuration means:

      • If the program fails to start, YARN tries 9 more times (9 retries + 1 start)

      • YARN sets the job status to failed only after it has been started for 10 times.

      • If process preemption occurs, node hardware fails or restarts, or NodeManager resynchronizes data, YARN tries to start the application.

      These restarts are not included in the number of yarn.application-Attempts.

[Important Note on the official website] The container closing behavior varies with Yarn versions

  • YARN 2.3.0 < version < 2.4.0. All containers are running if the application master fails.
  • YARN 2.4.0 < version < 2.6.0. TaskManager containers are kept alive across application master failures. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again.
  • YARN server < = version: Sets the attempt failure validity interval to the Flinks’ Akka timeout value. The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one This avoids that a long lasting job will deplete it’s application attempts.

  • YARN 2.3.0 < YARN version < 2.4.0. If the Application Master process fails, all containers restart.
  • YARN 2.4.0 < YARN version < 2.6.0. TaskManager Container continues to work during an Application Master failure. The advantages are faster startup time and shorter time to apply for resources when all Task Managers are started.
  • YARN 2.6.0 <= YARN version: The retry interval is set to Akka timeout. The system is set to a failure only when the maximum number of failed retries is reached in an interval.

If your ZooKeeper cluster is running in Kerberos security mode, you can add the following configuration as required:

zookeeper.sasl.service-name
zookeeper.sasl.login-context-name
Copy the code

If you don’t want to build your own ZooKeeper cluster or simply test locally, you can use Flink’s own ZooKeeper cluster, but it’s not recommended. We recommend you build your own ZooKeeper cluster.

3.2 Exactly-once implementation principle

1. In any case, data is guaranteed to have only one, no more, no less, effect on the application

3.2.1 background

Typically, streaming computing systems provide users with the ability to specify reliable patterns for data processing to indicate what guarantees will be made for data processing in the actual production run. In general, flow processing engines typically provide three data processing semantics for users’ applications:

  • At most, you can say it Once.: The semantics are simple to understand,The user's data is processed only once, regardless of success or failure, without retry or retransmission.
  • At least OnceIn this semantics, the system guarantees that data or events are processed at least once. If there’s an error or a loss in the middle, then it sends a new one from the source and it goes into the processing system, soThe same event or message can be processed multiple times.
  • Exactly OnceSaid:Each piece of data is processed precisely once, no more, no less.

Flink officially calls them “End To End Exactly-Once” : the starting and ending points that Flink applications must pass from Source To Sink. Flink itself cannot guarantee the “precision once” semantics of the external system, so if Flink wants to achieve the so-called “End to End precision once” requirements, then the external system must support the “precision once” semantics; This can then be achieved with the distributed snapshot and two-phase commit provided by Flink

3.2.3 Distributed Snapshot mechanism

Flink provides a fault tolerance mechanism for failure recovery, and the core of this fault tolerance mechanism is to continuously create snapshots of distributed data flow

Compared with Spark, Spark only performs Checkpoint recovery on Driver faults. Flink snapshots can be at the operator level, and the global data can also be snapshots.

3.2.3.1 Data Barrier

One of the core elements of Flink’s distributed snapshot is a Barrier, which can also be thought of simply as a marker that is strictly ordered and flows down the data stream. Each Barrier has its own ID and is extremely lightweight and does not interfere with normal data processing.

As shown in the figure above, if we have a data flow flowing from left to right, Flink will generate Snapshot 1, snapshot 2, snapshot 3… Flink has a dedicated “coordinator” in charge of collecting location information for each snapshot, which is also highly available.

The Barrier continues to flow down with normal data, and each time an operator is encountered, the operator inserts an identifier at a time when all upstream input streams have received snapshot N. At the same time, when our sink operator receives all barriers sent by upstream streams, it will indicate that this batch of data has been processed and Flink will send a confirmation message to the “coordinator” indicating that the current snapshot N is complete. When all sink operators confirm the successful processing of this batch of data, the snapshot is identified as completed.

? ==> Flink runs in a distributed environment, and there are many streams upstream of an operator. What if the time of arrival of barrier n for each stream is inconsistent? The measures taken by Flink here are: fast flow and slow flow.

Such as:

In the diagram above, one barrier N arrives early and the others arrive late. When the first barrier N arrives, the current operator continues to wait for other flows. The operator does not send all data down until all barrier n has arrived.

3.2.3.2 Asynchronous & Incremental

Following the mechanism described above, each time a snapshot is stored to our state back end, if it is synchronized, it blocks the normal task and introduces latency. Therefore, Flink can use asynchronous mode for snapshot storage.

In addition, since checkpoint is a global state, the states saved by users can be very large, mostly at G or T levels. In this case, checkpoint creation is slow and takes a lot of resources to execute. Therefore, Flink proposed the concept of incremental snapshots. In other words, each checkpoint is a full checkpoint, based on the last update.

3.2.4 Two-phase Commit

In the previous section, we explained checkpoint snapshot operations. The snapshot mechanism ensures that jobs fail over and then recover from the latest snapshot. In other words, the distributed snapshot mechanism ensures accurate one-time processing in the Flink system. However, in the actual production system, Flink will connect with various external systems, such as Kafka and HDFS. Once Flink fails, the operation will re-consume the old data, which is repeated consumption.

For this kind of situation, Flink version 1.4 introduces a very important function: two-phase commit, namely TwoPhaseCommitSinkFunction. The two-stage pairing of specific sources and sinks (especially Kafka version 0.11) makes it possible to “process semantics precisely once”

The implementation of two-phase commit in the Flink method is encapsulated by TwoPhaseCommitSinkFunction the abstract class, We only need to implement the beginTransaction, preCommit, commit and abort methods to achieve the “precise once” processing semantics. The implementation method can be found on the official website:

BeginTransaction, before starting a transaction, we create a temporary file in the temporary directory of the target file system, and write data to this file later when processing data.

PreCommit, during the pre-commit phase, we flush the file, then close the file, after which we cannot write to the file, and start a new transaction for any subsequent writes belonging to the next checkpoint;

Commit, during the commit phase, we move the atomicity of the pre-committed files to the real target directory. Note that this increases the delay in the visibility of the output data.

Abort, in the abort phase, we delete temporary files.

3.2.5 Flink – Kafka Exactly Once

As shown in the figure above, we use the example kafka-flink-kafka to illustrate the process of implementing “end-to-end precision once” semantics. The whole process includes:

  • Read data from Kafka
  • Window aggregation operation
  • Write the data back to Kafka

The whole process can be summarized as the following four stages:

  • Once Flink starts checkpoint, it’s going toEnter the pre-commit phaseAnd Flink JobManager will inject checkpoint barriers into the data stream.
  • When all barriers pass through the operator successfully and the snapshot is completed, thenThe pre-commit phase is complete;
  • When all of the operators are pre-committed, it willInitiate a "submit" action, but any “pre-commit” failure causes Flink to roll back to the nearest checkpoint;
  • The pre - commit to completeYou must ensure that the commit succeeds as well, as the Sink Operators and Kafka Sink above do.

3.2.6 status quo

The list of precise primary sources currently supported by Flink is shown in the following table. You can use the corresponding connector to implement the corresponding semantic requirements:

The data source Semantic guarantee note
Apache Kafka exactly once The corresponding Kafka version is required
AWS Kinesis Streams exactly once
RabbitMQ At most once (v 0.10)/exactly once (v 1.0)
Twitter Streaming API at most once
Collections exactly once
Files exactly once
Sockets at most once

If you need to implement true end-to-end precision semantics, you need sink’s cooperation. The current list of Flink support is as follows:

Write to the target Semantic guarantee note
HDFS rolling sink exactly once Relying on Hadoop versions
Elasticsearch at least once
Kafka producer at least once / exactly once Kafka 0.11 and above is required
Cassandra sink at least once / exactly once Idempotent update
AWS Kinesis Streams at least once
File sinks at least once
Socket sinks at least once
Standard output at least once
Redis sink at least once

3.3 Backpressure in production environment

How to deal with the problem of back pressure is directly related to the resource utilization and stable operation of the task

Backpressure is a problem frequently encountered in streaming computing systems. If there are backpressure nodes in your task, it means that the consumption speed of task data is less than the production speed of data, and the production speed of data needs to be controlled. Under normal circumstances, backpressure often appears in promotions, popular events and other scenarios, they have a common feature: a sudden increase in traffic in a short period of time resulting in data accumulation or slow consumption speed.

3.3.1 Backpressure of different frames

  1. Storm

    Since version 1.0, a new backvoltage mechanism has been introduced, Storm will actively monitor working nodes. When the data received by the working node exceeds a certain watermark, the backvoltage message is sent to ZooKeeper. ZooKeeper notifies all the working nodes to enter the backvoltage state. Finally, the data production source slows down the data transmission speed.

  2. SparkStreaming

    A RateController component is specially designed on the basis of the original architecture, which utilizes the classical PID algorithm. The Spark Streaming feeds the system several important data processing attributes, such as the number of messages, scheduling time, processing time, and scheduling time, and then calculates a rate based on these parameters. The rate is the maximum data processing capacity of the current system. Spark Streaming will speed up producers based on the calculated results.

  3. Flink

    Network transmission and dynamic current limiting are utilized. In Flink’s design philosophy, pure flow calculation gives Flink a natural advantage in backpressure design. The composition of the Flink task consists of a basic ** “flow” ** and a “** operator” **, so that the data in the “flow” will be put into a distributed blocking queue as the “operators” are computed and converted. When the blocking queue of the consumer is full, the data production speed of the producer is reduced.

3.3.2 Reverse pressure positioning

When your tasks are backloaded, if you are upstream of a messaging system like Kafka, it is obvious that consumption will slow down and Kafka messages will pile up

If your business doesn’t have a high requirement for data latency, backpressure doesn’t really matter. However, for large operations in large clusters, backpressure can cause serious “complications” :

  • The task state will become very large, because the data is piled up in the system on a large scale, these temporarily not processed data will also be put into the “state”;
  • Due to slow data accumulation and processing, checkpoint times out. Checkpoint is the key for Flink to ensure data consistency, which ultimately leads to data inconsistency.

3.3.2.1 Flink Web UI

Flink’s background page is our first choice for finding backpressure problems. The background page of Flink can see the running status of the current job intuitively and clearly.

As shown in the figure above, this is an example of calculating the backpressure state given on Flink’s official website. It is important to note that the calculation of the backpressure state is triggered only when the user accesses and clicks on a job. By default, Flink TaskManager will trigger the backpressure status monitoring every 50 ms for 100 times, and feed the calculation results back to JobManager, which calculates the proportion of backpressure and then displays it.

The logic of this scale is as follows:

  • OK: 0 <= Ratio <= 0.10, normal;
  • LOW: 0.10 < Ratio <= 0.5, average;
  • HIGH: 0.5 < Ratio <= 1, serious.

3.3.2.2 Flink Metrics

If you want to monitor Flink in more detail, Flink itself provides a number of REST apis to retrieve various states of a task. All the system monitoring indicators provided by Flink can be found here.

With the continuous change of version, up to version 1.10.0, the monitoring indicators provided by Flink are most closely related to backpressure as shown in the following table:

The index name use
outPoolUsage Usage of the sending buffer pool
inPoolUsage Usage of the receiving buffer pool
floatingBuffersUsage Handles node buffer pool usage
exclusiveBuffersUsage Data input buffer pool usage
  • outPoolUsage

This indicator indicates the data sending rate of the current Task. If the outPoolUsage of a Task is high, the data sending rate is high. However, if the outPoolUsage of a Task is low, special attention should be paid to the low processing speed of the downstream Task, or the current node may be a backpressure node, resulting in slow data processing.

  • inPoolUsage

InPoolUsage indicates the data receiving rate of the current Task. It is used together with outPoolUsage. If a node has a high inPoolUsage and a low outPoolUsage, it is likely to be a backpressure node.

  • FloatingBuffersUsage and exclusiveBuffersUsage

FloatingBuffersUsage indicates the buffer pool usage of the processing node. ExclusiveBuffersUsage Specifies the data input channel buffer pool usage.

3.3.3 Reverse pressure solution

  1. Data skew

    You can see the size of the data processed by each Task on Flink’s admin page. When data skew occurs, it is usually caused by grouping aggregation functions such as KeyBy. Users need to preprocess the hotspot Key to reduce or eliminate the impact of the hotspot Key.

  2. The GC to

    You can view the GC logs with the -xx :+PrintGCDetails parameter

  3. The code itself causes

    Developers incorrectly use Flink operators without in-depth understanding of operator implementation mechanisms resulting in performance problems. We can locate the problem by looking at the CPU and memory of the running machine node.

3.4 Data Skew in the production environment

Data volume is never a problem for real-time computing frameworks such as Flink and Spark or offline computing frameworks such as Hive. What really causes problems and serious consequences is data skew!

3.4.1 Principle of Data skew

The big data processing frameworks known to us at present, such as Flink, Spark and Hadoop, can process hundreds of billions of data because they all take advantage of the idea of distributed computing. Multiple computing nodes in a cluster are parallel, so that the data processing capacity can be linearly expanded.

In the actual production, Flink is run in the form of cluster, which contains two types of processes. TaskManager is actually responsible for the calculation Worker, and a group of tasks of Flink Job are executed on it. Task is the container for us to execute specific code logic. In theory, as long as we have enough tasks, we can process a large enough amount of data.

But in practice, large amounts of data often occur. A Flink job contains 200 Task nodes, of which 199 nodes can complete the calculation in a very short time. However, the execution time of one node is much longer than the other results. As the amount of data continues to increase, the compute node hangs, and the whole task fails to restart. In the Flink management interface, we can see that the amount of data of one Task is much larger than that of other nodes.

3.4.2 Causes & Solution

The direct manifestation of data skew in Flink task is that the task nodes frequently appear back pressure, but the problem cannot be solved by increasing the parallelism. An OOM exception occurs on some nodes because a large amount of data is concentrated on a node. As a result, the memory of the node is exhausted and tasks fail to restart.

  • produceReasons for data skewThere are two main aspects:
    • There are serious data hotspots in the business. For example, in the order data of Didi Taxi, the order volume of Beijing, Shanghai and other cities far exceeds that of other regions.
    • Technically, KeyBy and GroupBy are used in a large number of operations, and group keys are incorrectly used, which artificially generates data hotspots.
  • Ideas for solving problemsAnd it’s clear:
    • In business, hot key design should be avoided as far as possible. For example, we can divide hot cities such as Beijing and Shanghai into different regions and deal with them separately.
    • In case of technical hot spots, it is necessary to adjust the scheme to break up the original key to avoid direct aggregation; Flink also provides a number of features to avoid data skew.

3.4.3 Data Skew in Flink & Solutions

3.4.3.1 KeyBy Hotspot Issues

KeyBy is one of the grouping aggregation functions we use frequently. In actual business, we often encounter such scenarios: the top 10 provinces with the highest order volume are aggregated according to the province where the user places the order, or the device type with the highest traffic volume is aggregated according to the type of the user’s mobile phone.

If we directly and simply use KeyBy, simulate a simple statistical PV scenario as follows:

DataStream sourceStream = ... ; windowedStream = sourceStream.keyBy("type")
          .window(TumblingEventTimeWindows.of(Time.minutes(1)));
windowedStream.process(new MyPVFunction())
              .addSink(new MySink())... 
env.execute()...
Copy the code

When KeyBy is performed according to type, if the type of data is not evenly distributed, a large amount of data will be allocated to a task, resulting in data skew.

【 solution 】 :

  • First, split the grouping keys, such as adding random suffixes;
  • To aggregate the data after the disaggregation;
  • Restore the broken key to the real key;
  • The result is counted twice by KeyBy and then printed.
DataStream sourceStream = ... ; resultStream = sourceStream .map(record -> { Record record = JSON.parseObject(record, Record.class); String type = record.getType(); record.setType(type +"#" + new Random().nextInt(100));
        return record;
      })
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .aggregate(new CountAggregate())
      .map(count -> {
        String key = count.getKey.substring(0, count.getKey.indexOf("#"));
        return RecordCount(key,count.getCount);
      })
      // Secondary polymerization
      .keyBy(0)
      .process(new CountProcessFunction);

resultStream.sink()...
env.execute()...
Copy the code

The CountAggregate function is implemented as follows:

public class CountAggregate implements AggregateFunction<Record.CountRecord.CountRecord> {
    @Override
    public CountRecord createAccumulator(a) {
        return new CountRecord(null.0L);
    }
    @Override
    public CountRecord add(Record value, CountRecord accumulator) {
        if(accumulator.getKey() == null){
            accumulator.setKey(value.key);
        }
        accumulator.setCount(value.count);
        return accumulator;
    }
    @Override
    public CountRecord getResult(CountRecord accumulator) {
        return accumulator;
    }
    @Override
    public CountRecord merge(CountRecord a, CountRecord b) {
        return newCountRecord(a.getKey(),a.getCount()+b.getCount()) ; }}Copy the code

CountProcessFunction is implemented as follows:

public class CountProcessFunction extends KeyedProcessFunction<String.CountRecord.CountRecord> {
    private ValueState<Long> state = this.getRuntimeContext().getState(new ValueStateDescriptor("count",Long.class));
    @Override
    public void processElement(CountRecord value, Context ctx, Collector<CountRecord> out) throws Exception {
        if(state.value()==0){
            state.update(value.count);
            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000L * 5);
        }else{ state.update(state.value() + value.count); }}@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountRecord> out) throws Exception {
        // Here you can do business operations, such as sending statistics every 5 minutes
        //out.collect(...) ;
        // Clear state
        state.clear();
        // Other operations. }}Copy the code

In this way, the hot Key can be scattered to eliminate data skew.

3.4.3.2 Hotspot Issues of Group Aggregation

It is very common for businesses to GroupBy group, followed by aggregation operations such as SUM and COUNT. As we all know, the GroupBy function groups by Key, completely depending on the design of Key. If the Key is hot, a huge shuffle will occur, and the data with the same Key will be sent to the same processing node. If the amount of data on a key is too large, the key becomes a computing bottleneck and causes backpressure.

We still count PV scenarios according to the above groups, and the SQL statement is as follows:

select
      date,
      type,
      sum(count) as pv
from table
      group by
      date,
      type;
Copy the code

We can rewrite SQL as:

select date,
       type,
       sum(pv) as pv
from(
  select
        date,
        type,
        sum(count) as pv
  from table
        group by
        date,
        type,
        floor(rand()*100) Break it into 100 pieces at random
    )
    group by 
    date,
    type;
Copy the code

The SQL above is split into internal and external layers. The first layer reduces data hotspots by randomly breaking 100 copies. Of course, the breaking method can be flexibly specified according to the business.

3.4.3.3 Inconsistent parallelism between upstream and downstream Kafka consumption

When we use Flink to process real-time business, the upstream is usually a messaging system, and Kafka is the most widely used big data messaging system. Data skew also occurs when Kafka data is consumed using Flink.

It is important to note that when Flink consumes Kafka data, it is recommended that the upstream and downstream parallelism be consistent, i.e., the number of Kafka partitions is equal to the parallelism of Flink Consumer.

However, there may be a case where the parallelism of Flink consumers is greater than the number of Kafka partitions in order to speed up data processing. If nothing is done, some Flink Consumer threads will never consume data.

In this case, you need to configure Flink Redistributing, which is data redistribution.

Flink provides up to eight repartitioning strategies, as shown in the following class diagram:

After we receive a Kafka message, we can use custom data partitioning policies to implement load balancing of data, for example:

dataStream
        .setParallelism(2)
        REBALANCE REBALANCE
        .rebalance() //.rescale()
        .print()
        .setParallelism(4);
Copy the code

Rebalance. Data is repartitioned in round-robin mode to Rebalance global load.

The Rescale partitioning strategy outputs data in a circular fashion to each instance downstream, based on the degree of parallelism upstream and downstream.


【 conclusion 】

Flink is constantly iterating, and a variety of approaches are emerging to solve the data skew problem we encounter. MiniBatch, for example, requires us developers to constantly discover and learn new ways to solve problems.

3.5 Parallelism & Resource Settings in the Production Environment

3.5.1 Resource calculation in Flink

When we talk about computing resources in Flink, we usually refer to task-specific tasks.

It is important to understand the core concepts of computing resources in Flink, such as Slot, Chain, and Task. Understanding these concepts helps developers understand how computing resources in Flink are isolated and managed, and helps us quickly locate problems in production.

3.5.2 Task Slot.

In actual production, Flink runs as a cluster with two types of processes, one of which is TaskManager. A TaskManger is a JVM process that uses separate threads to execute tasks. To control how many tasks a TaskManger can accept, Flink introduced the concept of Task Slot.

We can simply think of Task Slot as a subset of the computing resources of the TaskManager. If a TaskManager has five slots, the computing resources of the TaskManager are divided into five equally. Different tasks are executed in different slots to avoid resource competition. Note, however, that Slot is only used for memory isolation and does not affect the CPU. Tasks running on the same JVM can share TCP connections to reduce network traffic, improving application efficiency and reducing resource consumption to a certain extent.

【Slot sharing 】

By default, Flink also allows sub-tasks of the same Job to share slots. In a Flink task, there are many operators, which have different computational pressures. For example, simple map and Filter operators require few resources, but some operators, such as Window and Group BY, need more computing resources to meet the computational requirements. In this case, operators with large resource requirements can share other slots to improve resource utilization of the entire cluster.

3.5.3 Operator Chain

Flink itself joins tasks of different operators together to form a new task. This is done because Flink itself provides a very effective means of task optimization, since tasks are executed in the same thread, reducing context switching between threads, and reducing the resource consumption associated with serialization/deserialization, thereby improving the overall throughput of our tasks.

3.5.4 parallelism

Flink uses parallelism to define how many subtasks an operator is sliced into. Our Flink code will be converted into a logical view, which will be converted into the corresponding sub-tasks for execution according to the user’s parallelism configuration during the actual run.

Flink itself supports different levels of methods for setting the parallelism of our tasks, which are:

  • Operator level √ When we write Flink program, we can display the parallelism of different operators in the code. Using the classic Wordcount program example:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     
    DataStream<String> text = ...
     
    DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .setParallelism(10) // Displays the parallelism configuration for each operator
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(1);
    wordCounts.print();
    env.execute("word count");
    Copy the code
    • In actual production, it is recommended to specify the degree of parallelism at the operator level to facilitate display and precise resource control.
  • Environment level Environment level parallelism means that we can set the parallelism of the entire task by calling env.setParallelism() :

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(5); .Copy the code
    • Once this parameter is set, it indicates that the parallelism of all operators in our task is the specified value, which is not recommended in production environments.
  • At the client level, we can specify the degree of parallelism when we submit the Flink Job with the command. When the task is executed and the code does not set the degree of parallelism, we will use the parameters when we submit the command.

    #The -p command is used to specify the parallelism when the task is submitted./bin/flink run -p 5 .. /wordCount-java*.jarCopy the code
  • Our flink-conf.yaml file has one parameter, parallelism. Default, which takes effect if the user does not set any other parallelism configuration:

[Parallelism Priority] Operator level (recommended in production environments) > Environment level > Client level > Cluster configuration level

3.6 Job monitoring in the production environment

In actual production, the background page of Flink can facilitate us to locate Flink JobManager, TaskManager, execution plan, Slot allocation, whether backpressure and other parameters, which can easily troubleshoot problems for a single task.

However, for many large and medium-sized enterprises, we are more concerned about the real-time operation status of the fine operation when we manage the cluster jobs. For example, it is necessary to design a special monitoring system to monitor the task operation of the cluster, such as the year-on-year comparison of real-time throughput, the task operation overview of the whole cluster, the water level of the cluster, or to monitor the operation of the ETL framework implemented by Flink.

3.6.1 Flink Metrics

Flink Metrics is a set of operational information collection libraries implemented by Flink. We can collect system Metrics provided by Flink itself, such as CPU, memory, thread usage, JVM garbage collection, network, IO, etc. User-defined metrics can also be collected by inheriting and implementing specified classes or interfaces.

3.6.1.1 function

  • Real-time collection of Metrics information in Flink or customized indicator information needed by users and display;
  • This information is collected through the Rest API provided by Flink and displayed on third-party systems.

3.6.1.2 Four types of monitoring indicators:

  1. Counter Counter is called a Counter, which is used to count the total amount of one indicator, such as the total amount of input and output of statistical data.

    public class MyMapper extends RichMapFunction<String.String> {
      private transient Counter counter;
      @Override
      public void open(Configuration config) {
        this.counter = getRuntimeContext()
          .getMetricGroup()
          .counter("MyCounter");
      }
      @Override
      public String map(String value) throws Exception {
        this.counter.inc();
        returnvalue; }}Copy the code
  2. Gauge Gauge is used to measure the instantaneous value of an indicator. For example, we are monitoring the memory usage of a node in Flink or the number of output values of a map operator.

    public class MyMapper extends RichMapFunction<String.String> {
      private transient int valueNumber = 0L;
      @Override
      public void open(Configuration config) {
        getRuntimeContext()
          .getMetricGroup()
          .gauge("MyGauge".new Gauge<Long>() {
            @Override
            public Long getValue(a) {
              returnvalueNumber; }}); }@Override
      public String map(String value) throws Exception {
        valueNumber++;
        returnvalue; }}Copy the code
  3. A Meter is used to calculate the average value of an indicator.

    public class MyMapper extends RichMapFunction<Long.Integer> {
      private Meter meter;
      @Override
      public void open(Configuration config) {
        this.meter = getRuntimeContext()
          .getMetricGroup()
          .meter("myMeter".new MyMeter());
      }
      @public Integer map(Long value) throws Exception {
        this.meter.markEvent(); }}Copy the code
  4. Histogram Histogram is a Histogram, and there are very few indicators belonging to the Histogram in Flink, which is usually used to calculate the maximum value, minimum value and median of indicators.

    public class MyMapper extends RichMapFunction<Long.Integer> {
      private Histogram histogram;
      @Override
      public void open(Configuration config) {
        this.histogram = getRuntimeContext()
          .getMetricGroup()
          .histogram("myHistogram".new MyHistogram());
      }
      @public Integer map(Long value) throws Exception {
        this.histogram.update(value); }}Copy the code

Metrics in Flink is a multi-layer structure that exists in the form of Group. The only Metric we use to locate is Metric Group + Metric Name.

3.6.1.3 get Metrics

There are several ways to capture Metrics

  • Firstly, we can see some indicators through the background management page of Flink.
  • Secondly, the status information of Flink task can be queried through the Http interface provided by Flink. Because the Http interface of Flink returns Json information, we can easily parse Json.
  • Finally, Metric Reporter is available. Both are explained in detail below.
1.3.6.1.3.1 Flink HTTP interface

Flink provides rich interfaces to assist us to query the status of Flink tasks. All requests can be queried by visiting http://hostname:8081/ and specifying urIs. You can click here to query all HTTP interfaces supported by Flink.

/config
/overview
/jobs
/joboverview/running
/joboverview/completed
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/jars/upload
/jars
/jars/:jarid
/jars/:jarid/plan
/jars/:jarid/run
Copy the code

[Example] You can run the/joboverView command to access the summary of all tasks in the cluster. The result is similar to the following:

{
  "running": []."finished":[
    {
      "jid": "7684be6004e4e955c2a558a9bc463f65"."name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015"."state": "FINISHED"."start-time": 1442419702857."end-time": 1442419975312."duration":272455."last-modification": 1442419975312."tasks": {
         "total": 6."pending": 0."running": 0."finished": 6."canceling": 0."canceled": 0."failed": 0}}, {"jid": "49306f94d0920216b636e8dd503a6409"."name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015". }}]Copy the code
3.6.1.3.2 Flink Reporter

Flink also offers a number of built-in reporters, which can be found on the Flink website.

For example, Flink offers built-in Reporters such as Graphite, InfluxDB, Prometheus, etc., so we can easily integrate these external systems. Detailed configurations can also be found on the Flink website’s details page.


Here we take a Flink cluster task monitoring case integrating Flink with InfluxDB and Grafana. In this monitoring system, InfluxDB plays the role of monitor datastore in Flink and Grafana plays the role of data presenter.

  • The installation of InfluxDB

    • InfluxDB is an open source, sequential data developed by InfluxData and written by Go that focuses on high-performance query and storage of sequential data. InfluxDB is widely used for monitoring data of storage systems and real-time data of the IoT industry.

    • InfluxDB installation process is very simple, we do not go into here, matters need to pay attention to is to modify InfluxDB configuration of/etc/InfluxDB/InfluxDB conf:

      [admin]
         enabled = true
         bind-address = ":8083"
      Copy the code
    • Open the console for InfluxDB through port 8083

  • The installation of Grafana

    • Grafana’s default account and password are admin and admin respectively, which can be accessed through port 3000.
  • Modify the flink – the conf. Yaml

    • Add the following configuration to the flink configuration file:

      metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
      metrics.reporter.influxdb.host: xxx.xxx.xxx.xxx
      metrics.reporter.influxdb.port: 8086
      metrics.reporter.influxdb.db: flink
      Copy the code
    • In the meantime, copy the flink-metrics-1.10.0. jar package to the /lib directory of Flink and start Flink.

    • Finally, you can see the Metrics information in Grafana

In fact, common Flink real-time monitoring platters including but not limited to Prometheus+Grafana and Flink log Access ELK are available for users to choose from. Considering ease of use, stability, and access cost, we recommend Prometheus/InfluxDB+Grafana to be used in actual monitoring.

3.7 Dimensional table association

In real production, we often have the need to base on the raw data flow and then associate a large number of external tables to supplement some attributes. For example, in the order data, we hope to get the name of the province where the order recipient resides. Generally, the order will record the ID of a province, so we need to query the external dimension table according to the ID to supplement the province name attribute.

In Flink streaming computing, some of our dimension attributes are generally stored in MySQL/HBase/Redis. These dimension table data are updated regularly and need to be associated according to business. According to the timeliness requirements of business on dimension table data association, there are the following solutions:

  • Query the dimension table in real time
  • Preloads full data
  • The LRU cache
  • other

3.7.1 Querying the Dimension Table in Real time

Flink operator in direct access to external database, such as MySQL for association, this way is synchronous, data is guaranteed to be up to date. However, when our flow calculation data is too large, it will bring huge access pressure to the external system. In case of connection failure and thread pool full, threads will be blocked and tasks will wait for data to return due to synchronous invocation, affecting the overall Task throughput.

Moreover, this scheme has high requirements on QPS of external systems. In the scenario of real-time calculation of big data, QPS is much higher than ordinary background systems, with a peak value of 100,000 to hundreds of thousands, and the overall operation bottleneck is transferred to external systems.

The core of this approach is that we can establish connections to external systems in Flink’s Map operator. Taking the order data as an example, we remove the city name according to the city ID of the user who placed the order. The core code is as follows:

public class Order {
    private Integer cityId;
    private String userName;
    private String items;
    public Integer getCityId(a) {
        return cityId;
    }
    public void setCityId(Integer cityId) {
        this.cityId = cityId;
    }
    public String getUserName(a) {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    public String getItems(a) {
        return items;
    }
    public void setItems(String items) {
        this.items = items;
    }
    @Override
    public String toString(a) {
        return "Order{" +
                "cityId=" + cityId +
                ", userName='" + userName + '\' ' +
                ", items='" + items + '\' ' +
                '} '; }}public class DimSync extends RichMapFunction<String.Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
    private Connection conn = null;
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim? characterEncoding=UTF-8"."admin"."admin");
    }
    public Order map(String in) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(in);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        // Query city_name based on city_id
        PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
        pst.setInt(1,cityId);
        ResultSet resultSet = pst.executeQuery();
        String cityName = null;
        while (resultSet.next()){
            cityName = resultSet.getString(1);
        }
        pst.close();
        return new Order(cityId,userName,items,cityName);
    }
    public void close(a) throws Exception {
        super.close(); conn.close(); }}Copy the code

In the above code, the entire query dimension table is wrapped in the RichMapFunction and then associated. It is important to note that this approach is typically used when querying dimension tables with small data volumes, and thread pools are also used to properly handle threads connected to external systems. Finally, to ensure that the connection is closed and released in a timely manner, you must release the connection in the last close mode. Otherwise, the number of MySQL connections will be exhausted and the task will fail.

3.7.2 Preloading Full Data

The idea is that every time our system starts up, we load all the dimension table data into memory, and then the data is correlated in memory without direct access to the external database.

The advantage of this approach is that we only need to access the external database once, which greatly improves efficiency. However, the problem is that once the data of our dimension table is updated, Flink task will not be aware of it, and there may be inconsistent data of the dimension table. In this case, we can take timing pull data of the dimension table. In addition, because dimension table data is cached in memory, this method consumes a lot of memory for computing nodes, so it is not suitable for a large number of dimension tables.

We still use the above scenario to associate the city name according to the city ID of the ordering user. The core code is as follows:

public class WholeLoad extends RichMapFunction<String.Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
    ScheduledExecutorService executor = null;
    private Map<String,String> cache;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    load();
                } catch(Exception e) { e.printStackTrace(); }}},5.5, TimeUnit.MINUTES);
    }
    @Override
    public Order map(String value) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(value);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        String cityName = cache.get(cityId);
        return new Order(cityId,userName,items,cityName);
    }
    public void load(a) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim? characterEncoding=UTF-8"."admin"."admin");
        PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
        ResultSet rs = statement.executeQuery();
        while (rs.next()) {
            String cityId = rs.getString("city_id");
            String cityName = rs.getString("city_name"); cache.put(cityId, cityName); } con.close(); }}Copy the code

3.7.3 LRU cache

LRU is a caching algorithm, meaning that the least recently used data is eliminated. In this strategy, our dimension table data is naturally divided into cold data and hot data. Cold data refers to data that is not often used, and hot data refers to data that is frequently queried.

In the scenario above, according to the name of the city associated with the city ID, the orders of Beijing and Shanghai are much higher than those of some cities in remote areas, so Beijing and Shanghai are hot data, and remote cities are cold data. This mode has a certain data delay and requires additional setting of the expiration time of each data. Because hotspot data is frequently used, it is resident in our cache and is not aware of changes in the dimension table. Here we use the CacheBuilder provided by the Guava library to create our cache:

CacheBuilder.newBuilder()
        // Store a maximum of 10000 entries
        .maximumSize(10000)
        // The expiration time is 1 minute
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .build();
Copy the code

The overall implementation idea is as follows: Use Flink’s RichAsyncFunction to read Hbase data into the cache. When we associate the dimension table, we first query the cache. If the data does not exist in the cache, we use the client to query Hbase and then insert it into the cache.

  1. First we need an Hbase asynchronous client:

    <dependency>
        <groupId>org.hbase</groupId>
        <artifactId>asynchbase</artifactId>
        <version>1.8.2</version>
    </dependency>
    Copy the code
  2. The core code is as follows:

    public class LRU extends RichAsyncFunction<String.Order> {
        private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
        String table = "info";
        Cache<String, String> cache = null;
        private HBaseClient client = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // Create an hbase client
            client = new HBaseClient("127.0.0.1"."7071");
            cache = CacheBuilder.newBuilder()
                    // Store a maximum of 10000 entries
                    .maximumSize(10000)
                    // The expiration time is 1 minute
                    .expireAfterWrite(60, TimeUnit.SECONDS)
                    .build();
        }
        @Override
        public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(input);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            / / read the cache
            String cacheCityName = cache.getIfPresent(cityId);
            // Obtain dimension data from hbase if the cache fails to obtain dimension data
            if(cacheCityName ! =null){
                Order order = new Order();
                order.setCityId(cityId);
                order.setItems(items);
                order.setUserName(userName);
                order.setCityName(cacheCityName);
                resultFuture.complete(Collections.singleton(order));
            }else {
                client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                    for (KeyValue kv : arg) {
                        String value = new String(kv.value());
                        Order order = new Order();
                        order.setCityId(cityId);
                        order.setItems(items);
                        order.setUserName(userName);
                        order.setCityName(value);
                        resultFuture.complete(Collections.singleton(order));
                        cache.put(String.valueOf(cityId), value);
                    }
                    return null; }); }}}Copy the code
    • We used asynchronous IO (RichAsyncFunction), which was developed to solve the problem of network latency becoming a system bottleneck when interacting with external systems.
    • In a streaming computing environment, if the access is synchronous when querying the external dimension table, the overall capability is bound to be limited by the external system. It is because of the emergence of asynchronous IO that access to external systems can be carried out concurrently, and there is no need to wait for the return, greatly reducing the system throughput and delay problems caused by network waiting time.
    • When using asynchronous IO, always use an asynchronous client. If you don’t have an asynchronous client, you can create your own thread pool to simulate asynchronous requests.

3.7.4 other

In addition to the above common processing methods, we can also use the SQL Join method to associate query results directly by broadcasting dimension table messages, or by customizing asynchronous thread pools to access dimension tables, or even by extending the method of associated dimension tables in Flink SQL.

In general, there are just a few ways to associate dimension tables, and a variety of solutions are derived from these ways. When evaluating the merits of a scheme, we should start from the business itself and use different methods in different business scenarios.

3.8 Efficient deduplication of massive data

There are certain business scenarios where duplicating data is unacceptable, such as accurately counting the number of users on a site in a day, or counting the number of packages shipped each day in a fact table. In traditional offline computing, SQL can be used directly with DISTINCT functions, or mapReduce-like ideas can be used as the volume of data continues to grow. In real-time computing, de-counting is an incremental and long-term process, and schemes need to be changed in different scenarios due to efficiency and accuracy problems.

Several common real-time deduplication schemes in Flink:

  • State-based back end
  • Based on the HyperLogLog
  • Based on BloomFilter
  • Based on the BitMap
  • Based on external databases

3.8.1 State-based backend

One type of state backend is RocksDBStateBackend. It stores the running status data in the RocksDB database, which by default stores the data in the data directory of the node where the TaskManager runs.

RocksDB is a K-V database and we can use MapState for de-duplication.

Here we simulate a scenario where we calculate the number of visits to skUs for each item as follows:

public class MapStateDistinctFunction extends KeyedProcessFunction<String.Tuple2<String.Integer>,Tuple2<String.Integer>> {
    private transient ValueState<Integer> counts;
    @Override
    public void open(Configuration parameters) throws Exception {
        // We set the TTL life cycle of ValueState to 24 hours
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        // Set the default value for ValueState
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
        descriptor.enableTimeToLive(ttlConfig);
        counts = getRuntimeContext().getState(descriptor);
        super.open(parameters);
    }

    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        String f0 = value.f0;
        // If it does not exist, it will be added
        if(counts.value() == null){
            counts.update(1);
        }else{
            // Add 1 if there is one
            counts.update(counts.value()+1); } out.collect(Tuple2.of(f0, counts.value())); }}Copy the code

The basic logic of the above code:

  • Defines a MapStateDistinctFunction class that inherits KeyedProcessFunction.
  • The core processing logic is in the processElement method. When a piece of data passes, we will determine whether the data already exists in MapState:
    • If it doesn’t exist then the count is 1;
    • If so, add one to the original count.

It should be noted that we have defined the expiration time of the state to be 24 hours. In actual production, a large number of keys will cause the state to swell. We can process the stored keys. For example, encryption is used to encrypt a Key into a few bytes for storage

Based on HyperLogLog 3.8.2

HyperLogLog is an estimation statistical algorithm that is used to count the number of different data in a set, which is called de-counting.

HyperLogLog algorithm is used for cardinality statistics, each HyperLogLog key only needs 12 KB of memory, can calculate the cardinality of 64 square elements close to 2. HyperLogLog is suitable for large data volume statistics because the cost is relatively low, up to 12KB of memory.

We can use this approach for statistics in business scenarios where 100% accuracy is not required.

  1. New dependencies:

    <dependency>
      <groupId>net.agkn</groupId>
      <artifactId>hll</artifactId>
      <version>1.6.0</version>
    </dependency>
    Copy the code
  2. Again, the traffic volume of SKU of the above commodities is taken as the business scenario, and the data format is <SKU, user ID of access >:

    public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String.Long>,HLL.Long> {
        @Override
        public HLL createAccumulator(a) {
            return new HLL(14.5);
        }
        @Override
        public HLL add(Tuple2<String, Long> value, HLL accumulator) {
            //value is the access record < commodity SKu, user ID >
            accumulator.addRaw(value.f1);
            return accumulator;
        }
        @Override
        public Long getResult(HLL accumulator) {
            long cardinality = accumulator.cardinality();
            return cardinality;
        }
        @Override
        public HLL merge(HLL a, HLL b) {
            a.union(b);
            returna; }}Copy the code

    Above code:

    • The addRaw method is used to insert an element into the HyperLogLog. If the element is not numeric, the insert must be hash.
    • The accumulator.cardinality() method is used to calculate the cardinality of the elements in HyperLogLog.

It is important to note that HyperLogLog is not exact de-duplication and should not be used if the business scenario is 100% correct.

3.8.3 Based on bloom filter

BloomFilter is similar to a HashSet. It is used to quickly determine whether an element exists in a collection. A typical application scenario is to quickly determine whether a key exists in a container and return if it does not.

As with HyperLogLog, bloom filters cannot guarantee 100% accuracy. But it is very efficient for inserts and queries.

public class BloomFilterDistinct extends KeyedProcessFunction<Long.String.Long> {
    private transient ValueState<BloomFilter> bloomState;
    private transient ValueState<Long> countState;

    @Override
    public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
        BloomFilter bloomFilter = bloomState.value();
        Long skuCount = countState.value();
        if(bloomFilter == null){
            BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
        }
        if(skuCount == null){
            skuCount = 0L;
        }
        if(! bloomFilter.mightContain(value)){ bloomFilter.put(value); skuCount = skuCount +1; } bloomState.update(bloomFilter); countState.update(skuCount); out.collect(countState.value()); }}Copy the code

We use Guava’s own BloomFilter, and each time a piece of data comes in, we check whether the current SKU exists in the BloomFilter in state, initialize it if not, and increase the number by one if it does.

Based on BitMap 3.8.4

This method not only reduces storage, but also achieves complete accuracy!

The basic idea of a bitmap is to use a Bit to mark the Value of an element, and the Key is the element. Since Bit is used to store data, the storage space can be greatly saved.

Suppose you have a requirement to find out if some number m exists among 2 billion random integers, and assume a 32-bit operating system with 4 gigabytes of memory. In Java, int is 4 bytes, 1 byte = 8 bits.

  • If each number is stored as an int, there are 2 billion ints, thus occupying approximately (2000000000*4/1024/1024/1024)≈ 7.45g

  • If the storage is different, number is 2 billion 2 billion, take up the space of about (1024/1024/1024/2000000000/8) material 0.233 G

Before using the BitMap algorithm, if the object you need to duplicate is not a number, you need to convert it to a number first. For example, a user can create a mapper that maps objects and numbers that need to be de-duplicated. The simplest way is to increment ids directly from the database dimension tables.

  1. Add a dependency:

    <dependency>
       <groupId>org.roaringbitmap</groupId>
       <artifactId>RoaringBitmap</artifactId>
       <version>0.8.0</version>
    </dependency>
    Copy the code
  2. Then, we also use the access record of the SKU of a commodity as an example:

    public class BitMapDistinct implements AggregateFunction<Long.Roaring64NavigableMap.Long> {
    
        @Override
        public Roaring64NavigableMap createAccumulator(a) {
            return new Roaring64NavigableMap();
        }
        @Override
        public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
            accumulator.add(value);
            return accumulator;
        }
        @Override
        public Long getResult(Roaring64NavigableMap accumulator) {
            return accumulator.getLongCardinality();
        }
        @Override
        public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
            return null; }}Copy the code

    In the above method, we use the Roaring64NavigableMap, which is an implementation of BitMap, and then our data is the SKU that is accessed each time. We add it directly to the Roaring64NavigableMap. Finally, accumulator. GetLongCardinality () can be directly obtained results.

3.8.5 Based on external Databases

Let’s say our business scenario is very complex and has a large amount of data. In order to prevent unlimited state expansion and avoid maintaining huge Flink state, we can use external storage. For example, we can use Redis or HBase to store data. We only need to design the storage Key. At the same time, external database is used for storage, so we do not need to worry about the state loss caused by the restart of Flink task, but it may be possible to send data repeatedly due to the restart, resulting in inaccurate data results.

3.9 Flink’s positioning in the enterprise

3.9.1 Real-time computing platform

Business data diminishes in value over time. More and more scenarios need to use real-time computing, in this context, the need for real-time computing platform emerges.

3.9.1.1 Architecture selection

  • architecturalFlink adopts the classic master-slave model. DataFlow Graph is similar to the topological structure created by Storm. After starting Flink program, it will process Stream Graph according to the user’s code, and then optimize it into JobGraph. JobManager generates ExecutionGraph based on JobGraph. ExecutionGraph is the real data structure that Flink can execute. When many ExecutionGraphs are distributed in a cluster, they form a network topology.
  • Fault tolerance inFor the previous Spark Streaming task, we can configure the corresponding checkpoint, that is, save point. When a failover occurs, a task is reloaded from a checkpoint to prevent data loss. However, this process will result in repeated processing of the original data, which cannot achieve the semantics of “processing once”. Flink implements the End To End Exactly-Once processing semantics based on two-phase commit.
  • The pressure onFlink does not use any complicated mechanism to solve the backpressure problem, Flink uses distributed blocking queues in the process of data transmission. We know that in a blocking queue, when the queue is full, the sender will be naturally blocked, and this blocking function is equivalent to providing the blocking queue with the ability to backpressure.

3.9.1.2 Overall architecture

  • Real-time data collection layerThe most widely used in the industry isKafkaOther important business data is also used by other messaging systems such as RocketMQ. Kafka is particularly suited to high-volume, high-QPS scenarios because of its high throughput and low latency, while RocketMQ has unique advantages in transactional messaging and consistency.
  • Real time computing layerFlink undertook theReal-time data acquisition,Real-time computingandDownstream sentThe role of. With the open source of Blink and some other real-time products, the development mode that supports visualization and SQL has become more and more popular.
  • Data storage layerStorage layer In addition to traditional storage engines such as MySQL, data is stored in Redis, HBase, and OLAP based on different scenarios. And the most important technology selection for this layer isOLAP. The selection of OLAP technology directly restricts the capability of data storage layer and data service layer. For OLAP technology selection, please refer tohere.
  • Data Service layerIt provides unified external query, multi-dimensional real-time summary, and perfect tenant and permission design to support data requirements of multiple departments and services. In addition, based on the data service layer, there will be data display, large screen, index visualization, etc.

3.9.1.3 Practical application

According to the public information of Meituan, the architecture of their real-time computing platform is as follows

  • Bottom layer – Kafka-based real-time data collection layer
  • Layer 2 — Real-time computing layer based On Flink (Flink On Yarn mode)
  • Layer 3: Data storage layer based on Redis, HBase, and ES
  • Top layer – data services layer based on job hosting services, job tuning, diagnostic alarms
  • Computing nodes — thousands (automatic capacity expansion and reduction)

According to the public information on Weibo, the architecture of their real-time computing platform is as follows

  • Early — just two layers of computing and storage
  • Medium term – generic four-tier architecture
  • Later stage – Add ClickHouse to do multi-dimensional calculations to meet the need for fast queries under large data volumes

3.9.2 Real-time data warehouse

Traditional offline data warehouse stores business data in a centralized manner and uses fixed computing logic to perform ETL and other applications such as post-modeling output reports.

An offline data warehouse is used to construct T+1 offline data, pull incremental data every day through scheduled tasks, create subject dimension data related to each service, and provide T+1 data query interfaces externally

The calculation and real-time performance of offline data warehouse are poor. The value of data itself will gradually weaken with the passage of time, so the data must reach the hands of users as soon as possible after the occurrence of real-time data warehouse construction requirements also emerged.

3.9.2.1 Advantages of Flink in real-time data warehouse

Flink has natural advantages in real-time data warehouse and real-time ETL:

  • State managementThere will be a lot of aggregated calculations in real-time data warehouse, which need to access and manage the state. Flink supports powerful state management.
  • Rich APIFlink provides extremely rich multi-level API, including Stream API, Table API and Flink SQL;
  • Ecological perfectFlink supports a variety of storage (HDFS, ES, etc.);
  • Flow of an organic wholeFlink is already unifying the apis for stream and batch computing.

3.9.2.2 Practical application

Layering is similar to offline warehouses, but there is a slight difference in the way the data model is handled:

  • Real-time data warehouse at detail level (DWD) summary data is generally associated based on Flink and other access Kafka messages
  • The real-time data warehouse dimension layer (DIM) data is usually stored in HDFS and HBase as a supplement to the detail layer

In the real-time data warehouse, different OLAP libraries are selected to satisfy AD hoc queries


According to meituan’s public information, their real-time data warehouse architecture is as follows

  • ODS layer, log messages based on MySQL Binlog and Kafka; (Mainly business data)
  • Detail layer, based on the fact data to associate detailed data; (Mainly based on Flink)
  • Summary layer, using detailed data for multi-dimensional query summary;
  • The application layer, provides HTTP and RPC query services externally.

According to the public information of Netease Yan Xuan, the structure of their real-time data warehouse is as follows

  • ODS layerKafka’s factual data;
  • Detail layer, after Flink treatment; Some dimension and historical data will be associated in the DWD layer and stored in Redis.
  • Summary layerIn different service scenarios, different storage devices are used. High-concurrency query and write are performed based on HBase.

It can be seen that Netease Yan Xuan in the construction of real-time data warehouse is the main consideration of computing and storage