This is the 15th day of my participation in the August More text Challenge. For details, see: August More Text Challenge

Xiao Li: Lao Wang, I had egg cake again this morning without intestines!

Lao Wang: yes, my bowel was donated again! Xiao Li, from your look, it was yesterday’s overtime again.

Li: Don’t mention it. The Logstash machine broke down again yesterday and was despised by operation and maintenance. It seems THAT I have to learn about your great data access system.

Lao Wang: good, I tell you, bala bala….

The person around: this person is not have what serious disease….

Lao Wang: I’d better hurry up and go to the company to tell you

Xiao Li: Ok!

When it comes to dumping Kafka data into HDFS, logStash is one of the first things that comes to mind. Because logStash is very bulky, when there is a large amount of data, we often run into the problem of insufficient resources, and there is no way to accurately control the size of files generated by HDFS. In order not to affect the performance of subsequent Hive analysis, we often need to merge small files (Small file isHDFSHave to face the problem), which not only lengthens the link, but also causes unnecessary waste of resources.

In order to avoid the waste of resources and reduce the data access link, we implement a data access system. The main advantages are shown below.

1. The configuration file can quickly meet data access requirements (in kafka-> HDFS scenarios).

2. To achieve Exactly once consumption, that is, to ensure that the data is not lost or duplicated.

3. You can manually configure the size of the generated file.

The overall architecture is shown in the following figure.

We use the current popular Flink consumption Kafka, and then periodically update the offset to hbase to meet the exact one-time consumption scenario. Our Flink task is also simple, with two main operators comprising Kafkasource and Parquetsink, as shown in the figure.

1.kafkasource

When consuming a kafka, we need to read the offset information from hbase. If the offset information is not read, the task is started for the first time and we need to read the data from the consumer group. In order to avoid repeated consumption, we need to start reading from the offset position (since the data before the offset has been successfully landed on the HDFS). To implement this function, we need to re-implement flink’s FlinkKafkaConsumerBase class. We need to internally add the logic to read the offset from hbase.

long offset = offsetManager.getPartitionOffset(seedPartition.getTopic(), seedPartition.getPartition()); if (offset ! = -1) { subscribedPartitionsToStartOffsets.put(seedPartition, offset); } else { subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); }Copy the code

When a Flink checkpoint failure occurs, the hbase checkpoint reads the offset information and controls the starting point of kafka consumption. The other thing to note here is that we also need to add this piece of code logic when the task starts. You can think for yourself.

2.parquetsink

Parquetsink is mainly to write data to HDFS. We can copy the logic of writing data to parquet file from parquet source code and write parquet file to HDFS by ourselves. Instead of focusing on the interactions with hbase, let’s focus on the hbase interaction process.

We override the snapshotState method in CheckpointedFunction, which is called every time at checkpoint. So we implement file generation and update hbase offset in this method. The first step is to determine whether the size of the generated file meets the size we set. If it does not, we do nothing. If the file size is satisfied, we will put the temporary file online and update the hbase offset information. The key code logic is shown in the following figure.

val isSuccess = commitPendingToStable(writerState.getParentPath, writerState.getFileName)
if (isSuccess) {
    offsetManager.saveOffset(partitionInfo.getTopic, partitionInfo.getPartition, writerState.getEndOffset + 1)
}
Copy the code

So far, we’ve covered all the key points. So let’s summarize. Take a look at the overall execution flow.

During the task execution, Flink executes checkpoint every 10 seconds and then calls the snapshotState method to check whether the file size meets the specified value. If the file size does not meet the specified value, flink does not perform any operation. If so, we put the file online and submit the offset to hbase. This represents the data offset stored in hbase that we have successfully landed. If a checkpoint failure or task failure occurs, when we restart or restore the job after a checkpoint failure, we read the offset information from hbase. This ensures accurate one-time consumption and ensures that the data on the landing is not lost or duplicated.

Today we talk here, more interesting knowledge, welcome to pay attention to the public number [programmer senior]. If you have any questions about this article, please leave a comment.