The dilemma of streaming computing

Before addressing message out-of-order, you need to define time and order. In stream processing, there are two concepts of time:

  • Event Time: An Event time is the time when an Event occurs, often represented by a timestamp and sent with data. Data flows with time stamps include Web service logs, monitoring agent logs, and mobile terminal logs.
  • Processing Time: The Processing time is the time of the server that processes event data. Generally, it is the clock of the server that runs the flow Processing application.

In the figure above, time1,time2 and time3 are the time when Spark Straming gets the message and will process it. The numbers in the Chinese block in the figure represent the time when this event is generated. It may be that network jitter causes delay in log collection on some machines. Time3 batch contains logs whose Event time is 2. In kafka, messages from different partitions are not ordered.

Two problems arise in real-time processing

  • Spark Streaming pulls a batch of data from Kafka that we might think contains multiple time intervals,
  • Data at the same time may appear in multiple batches,

For the first question, a batch contains the data of multiple time intervals, and the granularity of the interval added to us is 5 minutes. Therefore, a batch clock may contain part of the data in the 0~5 time interval, or part of the data in the 5~10 time interval, which is easy to handle. We on time for 5 minutes to take down the whole first, and then use the time after the integer divided into more groups, then calculated the index, select the time, count (*) group by integer (time)

The data in each time interval in this batch has been calculated

In the figure, for example, 2 appears in time2 and time3 in the time interval. We need to calculate the indicator of 2 in two batches, and then carry out the accumulative process. During the accumulative process, you can save the state in memory. Using operators such as UpdateStateByKey in Spark Streaming, which is not recommended, introduces state and checkpoint mechanisms into your application. Another way is to store the state in persistent storage. For example, if you do it every time in Redis or hbase,

Spark Streaming can pull logs from Kafka without losing logs. See my article

However, this mode can hardly guarantee exact once

If there is one of the following situations,

We checkpoint Job1, and joB1 is scheduled to execute, pull data processing from Kafak, and the results are saved in hbase, half saved, the machine hangs, and if we restart, Recover, joB1 will be executed repeatedly, kafka data will be consumed repeatedly, and some metrics in hbase will be added.

We could use spark or the watermark feature provided in Flink

In other words, maintain a window and set a maximum waiting time. Data in T1 ~T4 will trigger calculation when the maximum waiting time is reached. However, there are problems in this way.

Of course, if the business can tolerate it, then it is ok to use this feature, using full coverage every time

The solution

The problem we are facing above is that the combination of Sparkstreaming + Kaka can guarantee at lease once, but it is difficult to guarantee exact once, that is, repeated consumption. We have to find ways to reduce the weight.

There are two modes of landing storage of computed results,

  • Append delta mode, which is I’m adding up every time
  • Complete mode, where I guarantee idempotence, override every time, guarantee no side effects,

Because the data at the same time may appear in multiple batches, we can only use append mode in quasi-real-time calculation. As we have demonstrated above, this mode will have the problem of repeated consumption.

Since the machine fails occasionally, we can repair the data offline after the machine fails, that is, we need to ensure that there is a full amount of offline data.

We need to ensure that this data does not miss much and is separated according to the event time interval, so that we can load the offline data of this time interval for the time interval of the problem, calculate the result and then cover it. This ensures the accuracy of the data.

The characteristics of the data we landed were

  • Full, not too much leakage
  • Sharding according to the defined time interval

First, we need to create a unique ID for the message. We use the partition in Kafka plus offset as the unique ID for the message. If the message is stored in hbase, then when generating a message, Our ID will not be duplicated, even if you re-run it many times, HBase will automatically re-run it.

If the data is stored in HDFS, we can use ID as the header field in front of each row of data. In offline processing, the data can be reprocessed according to this field first, thus ensuring exact once semantics.

Output flow

Spark Streaming storage to HDFS or hbase will call saveAsHadoopDataset

val writer = new SparkHadoopWriter(hadoopConf)
writer.open()
 Utils.tryWithSafeFinallyAndFailureCallbacks {
        while (iter.hasNext) {
          val record = iter.next()
          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
        }
      }(finallyBlock = writer.close())
writer.commit()Copy the code

I’m calling getwriter based on the OutFormat you passed in,

Then call open Write close commit on Writer.

If it is hbase, the write method of the hbase client is invoked

  • After a user submits a PUT request, the HBase client adds the put request to the local buffer. When the put request meets certain conditions, the HBase client asynchronously submits the put request in batches through AsyncProcess. The default value of HBase autoFlush is true, which indicates that put requests are directly submitted to the server for processing. You can set autoflush=false so that put requests are first placed in the local buffer and not committed until the size of the local buffer exceeds a certain threshold (the default is 2M, which can be configured in the configuration file). Obviously, the latter uses the group commit mechanism to commit requests, which can greatly improve write performance, but because there is no protection mechanism, the submitted requests can be lost if the client crashes.

  • Before submission, HBase finds the region server to which the region server belongs based on rowkeys in the meta table. This process is obtained using the locateRegion method of HConnection. The rowkeys are also grouped by HRegionLocation for batch requests, and each group can correspond to a SINGLE RPC request.

  • HBase constructs a remote RPC request MultiServerCallable for each HRegionLocation, and then executes the call through rpcCallerFactory.newCaller (), ignoring failed resubmission and error handling. This is the end of the client submission operation.

In this case, if the HDFS file is written

  • First, a temporary write path is constructed from TaskAttemptID and a file stream is constructed
  • Write temporary write path,
  • Called at commit timecommitTaskIf the destination path exists, delete the temporary file and an error is reported. If the destination path does not exist, rename the temporary file and change the file name to the destination file. This is mainly to prevent multiple partitions from writing to the same destination file, resulting in conflicts.

Multi-file grouping output

If I have a requirement to output data to different files based on different keys, I need to use MultipleOutputFormat when I first group the data by batch and then output the files of different groups to different files

TreeMap> recordWriters = new TreeMap>();
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter rw = this.recordWriters.get(finalPath);
if (rw == null) {
    rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
    this.recordWriters.put(finalPath, rw);
}
rw.write(actualKey, actualValue);Copy the code

This is to maintain a TreeMap, and construct a writer for each different key. This writer is getBaseRecordWriter – > theTextOutputFormat. GetRecordWriter according to temporary path constructed an output stream, Wrapped as a LineRecordWriter the final writer is output on this DataOutputStream,

The upper layer multi-file output obtains different file output streams from treeMap according to different keys, and then performs multi-file output. There is a problem here. Data at the same time may appear in multiple batches, that is, many small files will be generated. But we could just append the output and avoid creating a small file,

This is where I need to change the source code,

The writer of MultipleOutputFormat calls the getBaseRecordWriter of the subclass. The writer of MultipleOutputFormat calls getBaseRecordWriter of the subclass. Using our own getRecordWriterNew method of TextOutputFormatNew, when constructing the output stream in the method, append if the file already exists,

val fileOut: FSDataOutputStream = if (HDFSFileService.existsPath(file)) {
       println("appendfile")
       fs.append(file)
     } else {
       println("createfile")
       fs.create(file, progress)
     }Copy the code
def getTaskOutputPath(job: JobConf, iname: String): Path = {
   val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)
   val completePath = name + "/" + iname
   val path = new Path(completePath)
   path
 }Copy the code

The method of constructing temporary paths has also been modified to force no temporary paths and append to the same file each time, thus achieving the purpose