Kafka with Spark Streaming is one of the most common golden partners in big data, mainly for real-time data entry or analysis.

To handle exceptions that might cause the Streaming program to crash, we generally need to manually manage Kafka’s offset, rather than make it commit automatically, by setting enable.auto.mit to false. Only by managing offsets can the streaming system as a whole approximate exactly once semantics as much as possible.

Manage the process of offset

The diagram below provides a brief overview of the process for managing offsets.



Offset Management Process

  • When Kafka DirectStream is initialized, offset is obtained for all partitions so that DirectStream can start reading data from the correct location.
  • Read message data, process and store the results.
  • Commit the offset and persist it in reliable external storage.

Both “Process and Store Results” and “commit offsets” in the figure can impose stronger restrictions, such as ensuring idempotency when storing results or atomic manipulation when submitting offsets.

The figure shows four offset storage options: HBase, Kafka itself, HDFS, and ZooKeeper. Considering the difficulty and efficiency of implementation, we have adopted Kafka itself and ZooKeeper.

Kafka’s own

In the 0.10+ version of Kafka, the default store for offset was moved from ZooKeeper to a built-in topic named __consumer_offsets. Spark Streaming also provides commitAsync() API specifically for submitting offsets. The usage is as follows.

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // Make sure the output is correct and idempotent
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Copy the code

The above is written in the official Spark Streaming documentation. But in practice we always do some operations on DStream, and we can use DStream’s transform() operator.

var offsetRanges: Array[OffsetRange] = array. empty[OffsetRange] stream.transform(RDD => {// Obtain OffsetRanges OffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }).mapPartitions(records => { var result = new ListBuffer[...] Result.tolist.iterator}). ForeachRDD (RDD => {if(! RDD. IsEmpty ()) {/ / data warehousing session. CreateDataFrame... } // commit offset stream.asinstanceof [CanCommitOffsets]. CommitAsync (offsetRanges)})Copy the code

In particular, do not break the mapping between the RDD partition and the Kafka partition during the transformation. That is, operators such as map()/mapPartitions() are safe, while operators that cause shuffle or repartition, such as reduceByKey(), join(), coalesce(), etc., are not safe.

Also note that HasOffsetRanges is a trait in KafkaRDD and CanCommitOffsets is a trait in DirectKafkaInputDStream. Spark-streaming -kafka package source code, you can see clearly.

private[spark] class KafkaRDD[K, V](
    sc: SparkContext,
    val kafkaParams: ju.Map[String, Object],
    val offsetRanges: Array[OffsetRange],
    val preferredHosts: ju.Map[TopicPartition, String],
    useConsumerCache: Boolean
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges
 
private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
Copy the code

This means that a stream transformation cannot be cast (resulting in a ClassCastException) because both RDD and DStream types have changed. Only RDD or DStream contains type ConsumerRecord.

ZooKeeper

Although Kafka removed offsets from ZooKeeper for possible performance concerns, ZooKeeper is internally stored in a tree node structure, which makes it naturally suitable for storing finely structured data like offsets. Moreover, our partitions are not very many and the batch interval is relatively long (20 seconds), so there is no bottleneck.

The ZKGroupTopicDirs class, which has been marked as obsolete in Kafka, provides a pre-specified path for kafka-related data to be stored. With this class, we can easily manage offsets using ZooKeeper. For ease of call, the logic to access offset is encapsulated into a class as follows.

class ZkKafkaOffsetManager(zkUrl: String) {
    private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager])
 
    private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000);
    private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false)
 
    def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
        val offsets = mutable.HashMap.empty[TopicPartition, Long]
        val partitionsForTopics = zkUtils.getPartitionsForTopics(topics)
 
        // /consumers/<groupId>/offsets/<topic>/<partition>
        partitionsForTopics.foreach(partitions => {
            val topic = partitions._1
            val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic)
 
            partitions._2.foreach(partition => {
                val path = groupTopicDirs.consumerOffsetDir + "/" + partition
                try {
                    val data = zkUtils.readData(path)
                    if(data ! = null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info("Read offset - topic={}, partition={}, offset={}, path={}",
                            Seq[AnyRef](topic, partition.toString, data._1, path)
                        )
                    }
                } catch {
                    case ex: Exception =>
                        offsets.put(new TopicPartition(topic, partition), 0L)
                        logger.info(
                            "Read offset - not exist: {}, topic={}, partition={}, path={}",
                            Seq[AnyRef](ex.getMessage, topic, partition.toString, path)
                        )
                }
            })
        })
 
        offsets.toMap
    }
 
    def saveOffsets(offsetRanges: Seq[OffsetRange], groupId: String): Unit = {
        offsetRanges.foreach(range => {
            val groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic)
            val path = groupTopicDirs.consumerOffsetDir + "/" + range.partition
            zkUtils.updatePersistentPath(path, range.untilOffset.toString)
            logger.info(
                "Save offset - topic={}, partition={}, offset={}, path={}",
                Seq[AnyRef](range.topic, range.partition.toString, range.untilOffset.toString, path)
            )
        })
    }
}
Copy the code

Thus, the offset is stored in the ZK /consumers/[groupId]/offsets/[topic]/[partition] path. When DirectStream is initialized, the readOffsets() method is called to get the offset. When the data processing is complete, the saveOffsets() method is called to update the values in ZK.

Why not checkpoint

Spark Streaming’s checkpoint mechanism is the easiest to use. Checkpoint data is stored in HDFS and can be quickly recovered if the Streaming application fails.

However, if the Streaming program’s code changes, repackaging execution can cause deserialization exceptions. This is because checkpoint serializes the entire JAR package the first time it persists, so it can be recovered on restart. After repackaging, old and new code with different logic will either report an error or still execute the old code.

The only way to solve this problem is to delete the checkpoint file on HDFS, which will also delete Kafka’s offset information, making no sense.