preface
It’s been a while since the last Flink article, when we kind of went through some operators, so before we get into this article, let’s warm up and remember the code
Now we want to implement a function, which is also a word count, but this word count is going to implement a custom threshold and print every time a threshold is reached. If you already know Flink, you will know that we just need to customize a downstream
public class TestOperatorState {
public static void main(String[] args) throws Exception{
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStreamSource<Tuple2<String,Integer>> dataStreamSource = env.fromElements(
Tuple2.of("spark".3),
Tuple2.of("kafka".3),
Tuple2.of("flink".3),
Tuple2.of("hive".3),
Tuple2.of("hbase".3),
Tuple2.of("es".3)
);
dataStreamSource.addSink(new MySink(2));
env.execute("TestOperatorState");
}
}
Copy the code
This function is then implemented through a MySink
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
public class MySink implements SinkFunction<Tuple2<String.Integer>> {
private List<Tuple2<String,Integer>> bufferElements;
// Define a threshold
private int threshold;
public MySink(int threshold){
this.threshold = threshold;
bufferElements = new ArrayList<Tuple2<String, Integer>>();
}
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
bufferElements.add(value);
if (bufferElements.size() == threshold){
System.out.println("Data:"+bufferElements);
bufferElements.clear();
}
}
}
Copy the code
Runtime always quote this Failed to load the class “org. Slf4j. Impl. StaticLoggerBinder”, if you don’t think very pleasing to the eye, it can also add the pom
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
Copy the code
However, this will turn the output of 3 sentences into a screen full of some puzzling log, so consider yourself
However, there is an obvious problem with this program, because the data is stored in memory and will be lost when the program restarts. Therefore, in order to ensure the fault tolerance of state, Flink needs to checkpoint state.
A, the Content,
1.1 Let’s go back to the program
If we want to checkPoint the program we just wrote, we should give it one in MySink
private ListState<Tuple2<String.Integer>> checkPointState;
Copy the code
Then implement a CheckpointedFunction interface, this interface has two methods to implement, one is snapshotState, one is initializeState, they two English literal translation has been very easy to understand. One is to take a snapshot of the state, the other is to initialize the state, as shown in the following figure
For those of you who have seen the various states of Flink, you know how to do the following: register the State first, and then use it
Ok, at this point we are also following the example, initializeState is the method of data recovery when we restart. Note that the data types used here need to be kept in mind. The checkPoint is actually to maintain a special state to record its status, which is the same as the state seen before.
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
"buffer", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
@Override
public TypeInformation<Tuple2<String, Integer>> getTypeInfo() {
return super.getTypeInfo();
}
})
);
checkPointState = context.getOperatorStateStore().getListState(descriptor);
// If the task restarts
if (context.isRestored()){
for (Tuple2<String, Integer> lostData : checkPointState.get()) {
bufferElements.add(lostData);
}
}
}
Copy the code
The snapshotState is used to write data in our memory to bufferElements every once in a while
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkPointState.clear();
for (Tuple2<String, Integer> data : bufferElements) {
bufferElements.add(data);
}
}
Copy the code
This example is the original one on the official website, with exactly the same code, but in fact, we usually do not write the code in this way, because in this case, I have to maintain a state to record the information of this state, and we should entrust Flink to manage it for us according to the routine.
So where is the state stored in Flink?
1.2 state Storage location
Flink-supported StateBackend, state, is stored in the following three places by default:
1.2.1 MemoryStateBackend
State is stored in memory. This is also a master-slave architecture. Flink will start a JobManager service and then slave it to TaskManager. The status information is stored in the TaskManager heap and stored in the JobManager heap at checkpoint.
And we can specify it manually in our program, like this
So if our program hangs, the data in memory will be lost naturally, so to deal with this problem, we will modify this parameter
1.2.2 FsStateBackend
FsStateBackend is an optimization of the previous MemoryStateBackend, whose TaskManager periodically stores state to HDFS. To checkpoint, save the status to a specified file (such as HDFS).
env.setStateBackend(new FsStateBackend("hdfs path"));
Copy the code
Disadvantages: the state size is limited by the TaskManager memory (5M by default, which can be configured). If data in memory exceeds this value before HDFS storage, data will still be lost. The advantage is the memory operation, state access is fast
1.2.3 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("path"));
Copy the code
This configuration is used in the production environment. The status information is stored in the RocksDB database (key-value data storage service). When the status information is saved in the local file checkpoint, the status information is saved to the specified file (such as HDFS).
The disadvantage is that the state access speed is lower than that of FsStateBackend. Advantages: Can store an enormous amount of state information, because this is also distributed
1.3 introduction of checkpoint
To ensure the fault tolerance of the state, Flink needs to checkpoint the state.
Checkpoint is the core function of Flink to implement the fault tolerance mechanism. It can periodically generate snapshots based on the states of each Operator/task in the Stream according to the configuration, and store these state data regularly and persistently. When Flink program crashes accidentally, You can selectively recover from these snapshots when you re-run the program to correct program data anomalies caused by failures
The prerequisite for Flink’s checkpoint mechanism to interact with persistent stores (stream and state) is a persistent source that supports playback of events for a certain amount of time. Typical examples of such sources are persistent message queues (e.g. Apache Kafka, RabbitMQ, etc.) or file systems (e.g. HDFS, S3, GFS, etc.) used for persistent storage of state, e.g. Distributed file systems (e.g. HDFS, S3, GFS, etc.)
There are many tasks in a Flink Task, all of which generate many states, and these states are periodically stored somewhere, as shown in the figure as checkPointState.
Just take it out when it’s healed
1.4 Setting Parameters
Because checkPoint is not enabled by default, you need to enable it by setting it
env.enableCheckpointing(10000);
Copy the code
Set it to checkPoint every 10 seconds. However, the recommended value is between 20 and 120 seconds.
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Copy the code
The checkPoint mode is set to the once semantics, which is the default
At Least One performance is definitely better, but data can be repeated, so reference scenarios. This is not the same as Spark Streaming, which can be EXACTLY_ONCE thanks to RDD fault tolerance
checkpointConfig.setMinPauseBetweenCheckpoints(500);
Copy the code
This is the minimum interval between two checkpoints. Some people may wonder if I have already given a 10 second interval between checkpoints. What is the meaning of this parameter? Here to explain, we perform checkPoint is certainly need a certain amount of time, such as the execution of checkPoint I spent 10 seconds or ten seconds I was not performed, setMinPauseBetweenCheckpoints (500) that is to say, I would have at least a certain amount of time between the two checkpoints, waiting a little bit for the last checkPoint
checkpointConfig.setCheckpointTimeout(60000);
Copy the code
The checkPoint timeout period is set. If the current checkPoint does not end in one minute, the next checkPoint is aborted and executed
checkpointConfig.setMaxConcurrentCheckpoints(1);
Copy the code
The default value is 1. That is, only the latest checkPoint is reserved
env.getCheckpointConfig(a).enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Copy the code
If the Flink processor is canceled, the Checkpoint data is kept for the Flink processor to be restored to the specified Checkpoint as required. You can also set the Flink processor to be automatically deleted when it stops
1.5 Using checkPoint to Restore Data
Restoring data is simply a restart task:
Flink supports different restart policies to control how jobs are restarted in the event of a failure. A cluster starts with a default restart policy, which is used when no specific restart policy is defined. If a restart policy is specified when the work is submitted, it overwrites the cluster’s default policy, which can be specified from the Flink -conf.yaml configuration file. The restart-strategy configuration parameter defines which policy is used.
Common restart policies
(1) Fixed delay strategy
(2) Failure rate
(3) No restart
Copy the code
If Checkpointing is not enabled, use the no restart policy.
If checkpointing is enabled but the restart policy is not configured, use the fixed-delay policy. The default value is integer.max_value. The restart policy can be configured in flink-conf.yaml, indicating the global configuration. It can also be specified dynamically in the application code, overwriting the global configuration.
1.5.1 Periodic Restart Policy
The first is global configurationflink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
Copy the code
We can also use code for configuration
Second: apply code Settings
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3.// Number of restart attempts
Time.of(10, TimeUnit.SECONDS) // The interval between each retry
));
Copy the code
1.5.2 Failure Rate Policy (Few Scenarios)
The first:
Global configurationflink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
Copy the code
It can also be set in code
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3.// Maximum number of failures in a period
Time.of(5, TimeUnit.MINUTES), // The time period for measuring the number of failures
Time.of(10, TimeUnit.SECONDS) / / interval
));
Copy the code
This means that 3 restarts are allowed within 5 minutes
1.5.3 not resume
The job’s dead. It’s dead. Let it go
The first is global configurationflink-conf.yaml
restart-strategy: none
Copy the code
Code sets
env.setRestartStrategy(RestartStrategies.noRestart());
Copy the code
Of course, if we are daily development, it is unlikely to set the global configuration, are in accordance with different needs to modify the Settings of this piece
1.6 Multiple checkpoint Settings
By default, if the Checkpoint option is set, Flink keeps only the last Checkpoint, and when Flink fails, Flink can recover from the latest Checkpoint. However, if we want to keep multiple checkpoints, and can select one of them to recover according to actual needs, this will be more flexible. For example, if we find that there is a problem with data recording in the last four hours, we want to restore the whole state to the previous four hours, Flink can keep multiple checkpoints.
Add the following configuration to the Flink configuration file conf/flink-conf.yaml to specify the maximum number of checkpoints to be saved
state.checkpoints.num-retained: 20
Copy the code
And of course we’re going to choose to configure it in code, which is what we just did
checkpointConfig.setMaxConcurrentCheckpoints(20)
Copy the code
After the task restarts, the latest checkPoint is used to restore data
This setting will view the corresponding Checkpoint in HDFS files stored on the directory HDFS DFS – ls HDFS: / / the namenode: 9000 / flink/checkpoints if you want to back to a Checkpoint, You only need to specify a corresponding Checkpoint path
1.7 Manually Restore Data using checkPoint
Each Flink task will have its own JobID, and the HDFS data saved by checkPoint will also be named according to this JobID. Note this. If we need to manually checkPoint data recovery, we need to go to the HDFS directory and find our checkPoint folder, which is named chk-xx by default, followed by a number indicating the current checkPoint number. The command is
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
Copy the code
Chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx
And of course there’s a problem with that, because Flink just did
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Copy the code
It is not end-to-end semantics. When the business changes and we need to stop the task and modify the code, the data may be repeated once again.
Therefore, we need to use savePoint at this time. Flink can continue to perform calculations from the point before the upgrade after the program is upgraded through savePoint function to ensure the uninterrupted data. You can save data source offsets, operator status and other information, and can continue to consume from any time in the past when the application did savepoint
1: set it in flink-conf.yamlSavepointStorage location
This parameter is not mandatory, but is later created for the specified JobSavepointYou do not need to specify this parameter when running the command manuallySavepointThe location of the
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2: Trigger asavepointDirectly triggered or incancelWhen triggered 】
bin/flink savepointJobId [targetDirectory] [-yid yarnAppIdonIn YARN mode, you need to specify -yid.
// Stop Flink with this command and save another checkPoint before exiting
bin/flink cancel-s [targetDirectory] jobId [-yid yarnAppIdonIn YARN mode, you need to specify the -yid parameter, which is application_xxx.
3: From the specifiedsavepointStart the job
bin/flink run -s savepointPath [runArgs]
Copy the code
Manually executed by users, it is a pointer to Checkpoint and does not expire. It is used during an upgrade.
Note: In order to upgrade between different versions of the job and between different versions of Flink, it is highly recommended that the programmer manually assign an ID to the operator through the uid(String) method.
These ids will be used to determine the state range of each operator. If you do not manually specify an ID for each operator, Flink automatically generates an ID for each operator. As long as these ids remain unchanged, the program can be restored from savepoint. These automatically generated ids depend on the structure of the program and are sensitive to code changes. Therefore, it is strongly recommended that the user manually set the ID.
Finally
watermark