Check whether checkpoint timeout occurs at checkpoint :(this article is based on flink-1.4.2)
Timeout judgment logic
Jobmanager timer trigger checkpoint, send trigger signal to source, and start an asynchronous thread, stop this checkpoint after checkpoint timeout. After the cancel action is executed, the checkpoint of this round will be timed out. If the ack signal of the last sink operator is received before the timeout, the checkpoint is successful.
So what could be the reason for overtime? Time is spent either on barrier alignment or on asynchronous state traversal and HDFS writing. The second type is nice because it happens when the state is large
Barrier Processing Process
StreamTask collects the corresponding inputChannel barrier, sends the barrier after collecting it, and starts its task’s checkpoint logic. If the upstream and downstream are in rescale or forward form, The downstream task only needs to wait for one concurrent barrier because it is point-to-point. If it is hash or rebalance, every downstream task must checkpoint all concurrent upstream barriers.
Barrier Sending process
RecordWriter#broadcastEvent
This method is specifically used to deliver barriers. First, the buffers that have not yet been delivered in serializer will be delivered to prevent barriers from crossing the path of data delivery to ensure consistency. Meanwhile, barriers will be packaged as buffers. Directly package the heap memory as a buffer.
PipelinedSubpartition#add
The data to be sent downstream is added to the subpartition and notifybufferAvailable is notified to the subpartitionView. NotifybufferAvailable is a notifybufferAvailable event. What we see is the delay of remote consumption. In fact, the local barrier is immediately executed downstream, while the remote barrier requires network transmission.
server netty handler
LengthFieldBasedFrameDecoder => messageDecode => PartitionRequestServerHandler => PartitionRequestQueue => MessageEncoder notifybufferAvailable finally triggers the PartitionRequestQueue to writeAndFlush data to the netty client. Before flushing, the notifybufferAvailable event determines whether the channel is writable. If the barrier is flushed successfully, the listener -> barrier processing logic is executed. If the barrier is flushed successfully, the listener -> barrier processing logic is executed. If the barrier is flushed successfully, the listener -> Barrier processing logic is executed.
Barrie Receive process
LengthFieldBasedFrameDecoder => messageDecode => PartitionRequestClientHandler => messageEncoder If there are stagedMessages, they are not processed and added to the stagedMessages. If the data is not copied from netty buffer to localBuffer, requestBuffer is required. This method does not block the consumption process, but if the request does not reach buffer then it will throw data into stagedMessages and listen to bufferPool until buffer recyle is available. The auto read flag of the channel is set to false, so that the channel no longer reads data, the barrier is not readable, and each TaskManager shares a channel. So any blocking on taskManager will affect consumption on that TaskManager.
summary
As can be seen from the above, the main reason why barrier downstream cannot be aligned is due to insufficient consumption capacity of downstream, which will lead to buffer accumulation for a period of time, but this is not enough to cause upstream backpressure, because the backpressure requires the downstream channel to continue to fail to write, resulting in TCP blocking. When the upstream OutputBuffer is full, the backpressure is caused.
The usual screening process
Pending -> 10 subtasks of EventEmit XXX
Subtasks that do not start due to barrier alignment will start at n/a because they have not yet started to checkpoint when reporting their CheckpointStats. Don’t be misled into looking at the subtask problem here.
Then go to JobManager to check for some delay information about this checkpoint
The 2018-11-29 18:43:04, 624 INFO org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - checkpoint 224 expired before Completing. 2018-11-29 18:43:26, 763 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received the newest message for now expired checkpoint attempt 224 from f2862289958b430bc3dc20f39794ca2c of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 766 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 8f569166274106f22e49ed2ce919c930 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 770 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from a29e34c210b39104004af7f067c1a5d0 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 771 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 7d4914521fd53fca56a4050d6f191ae9 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 771 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 618c78d0008d0d525728ff9824339229 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 773 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from c8ba24a328234dc7f2f271db4a8eb1e3 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 777 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 72af6c722fcc085dc8f7c46e9124d82e of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 777 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from f824cb6920b04d19e05278ee362ec675 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:26, 780 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from af6d867d2f12be23c7b23a938aba7c5e of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:27, 265 WARN org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from cee0205fe9a85e3e89e023a1166ed1e6 of job 7 c7769847d333438dd9ce845d5a2d980. 2018-11-29 18:43:44, 624 INFO org. Apache. Flink. Runtime. Checkpoint. CheckpointCoordinator - Triggering checkpoint 225 @ 1543488224622Copy the code
According to the ID of these failed tasks, we can query which taskManager these tasks fall on. After investigation, we find that it is the same machine. Through the UI, we can see that the incoming data of this machine is obviously larger than that of other machines, so this problem is caused by data skew
indicators
There are several indices that can reflect an operator
- inPoolUsage
- OutPoolUsage
- OutputQueueLength
- inputQueueLength
First of all, the first three values are not accurate data obtained without locking, so for the performance of data consumption itself, the reference is not significant. InputQueueLength can be used as a reference to why this is locked, perhaps the community missed it. This value should be the biggest (partition if the hash, other partition method will be more a little bit small) upstream concurrent * 2 + 8 + concurrent upstream, if arrive around the value, will now send the barrier to the downstream cannot deserialize and properly checkpoint operation, as to why the last upstream concurrent alone carry on, This is because of the number of barriers that this implies, which is also counted in inputQueueLength.
In flink1.5, @zhijiangw network stack is optimized for sending data before sending data.