Introduction to the

  • SparkStreaming consumption Kafka achieves precise one-time consumption. Ensure that messages are not lost and reused.

Semantics of message processing

At Least Once

  • The message is processed at least once
  • Data loss can be guaranteed, but data duplication may exist.

At Most Once

  • Messages are processed at most once
  • Data duplication can be ensured, but data loss may occur.

2. What Exactly is the matter with you Once?

  • The message is processed just once
  • It is not really possible to process a message once, it is possible to do soMessage reliabilityandIdempotency of messagesThat is, there is no problem of data duplication and data loss for upstream and downstream systems
  • In fact throughAt Least Once + Idempotent processingTo achieveExactly Once semantic

Process and problems of consuming data

The default Kafka consumption is auto-commit offsets (default auto-commit every 5 seconds), so two things can happen

  • tipOffsets are records of where each consumer consumes each partition (queue) and are stored in the consumer_offsets theme of Kafka

Case 1: The offset is committed before the message is processed

  • If after submitting the offset first, the process hangs in the process of processing the data and preparing to unload the disk. However, if the offset is committed, the next consumption will start from the latest offset location, so the data that did not fall before will be lost.

Case 2. After the message is processed, the offset is committed

  • If, after reprocessing the message, the process hangs and cannot commit the offset of the latest consumption, then the next consumption will continue from the old offset location, which may result in repeated consumption of data

It can be found that consuming a message has two steps to process the message and commit offsets, and we cannot guarantee the atomicity of these two steps, i.e., simultaneous success or failure, which may result in data loss or repeated consumption

To realize Exactly Once

Method 1: Use transactions

  • The key to implementing Exactly Once semantics is assuranceProcess the messageandCommit offsetStudent: Atomicity of theta.
  • So by putting these two operations into one transaction, either processing the message first or committing the offset first can ensure that the message is not lost or duplicated

implementation

  • For example, manually maintain the offset of the consumption message and put the offset into MySQL, and then put the data drop into MySQL, which supports transactions, so we can guarantee the atomicity of the two operations.

Disadvantages:

  • There are dependencies on storage tiers and only transaction-based storage tiers can be used
  • Transaction performance is not high
  • In addition, a storage layer is prone to single point of failure and the pressure is too high. If the storage layer is distributed, it needs to do distributed transactions, which increases the complexity

Method 2: Manually commit the offset + idempotency

  • Ensure that the data is processed before submitting the offset, but it may fail to submit the offset and lead to repeated consumption. In this case, the idempotent saving of data is needed, that is, no matter how many times the data is saved, the effect is the same and there is no duplicate data.

Idempotent preservation implementation

  • Some storage tiers themselves support idempotent operations, such as MySQL’s main keyboard, and unique indexes. If the same ID is inserted once, it is the same as if it were inserted 100 times. And Eleaticsearch’s primary key ID also naturally supports idempotent operations (again overridden). Redis, too, has far more storage layers that support idempotent operations than transactions, and performs better
  • If you use a storage tier that does not support idempotent operations, you may need to manually implement idempotent operations or de-repetitions yourself.

Pseudo-code implementation:

object Test {

  case class UserLog(id:Int, name:String){}

  def main(args: Array[String) :Unit = {
	 /** * 1, SparkStreaming is enabled */
    val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))		
	
	Select * from Redis where groupId = offset */
    val topic = "topic-log"
    val groupId = "consumer-007"

	/** ** ** */
	 ExactOneUtil.Builder().streamingContext(ssc).topicGroup(topic, groupId).build(businessProcessing)

	ssc.start()
    ssc.awaitTermination()
  }

  /** Business processing */
  def  businessProcessing(offsetDStream: DStream[ConsumerRecord[String.String]], builder: Builder) :Unit = {
      // Get the Dstream
      val jsonObjectDStream: DStream[JSONObject]= offsetDStream.map(msg => {
        val jsonObj: JSONObject = JSON.parseObject(msg.value())
        / /...
        jsonObj
      })
  
  
      jsonObjectDStream.foreachRDD(rdd => {
        rdd.foreachPartition(jsonObjList => {
            // If id is the primary key, it is naturally idempotent, no matter how many times it is saved
            val resultData: Iterator[UserLog] = jsonObjList.map { obj => {
              UserLog(obj.getIntValue("id"), obj.getString("nama"))}}// Drop the result reasultDat
            resultData.toList
            / /... save to MySQL or Es or Redis
        })
  
          // Commit offset after processing a batch of RDD data and ensuring that the disk falls
          builder.saveOffsetrange()
      })
  }
Copy the code

ExactOneUtil

object ExactOneUtil {
  var builder: Builder= _def Builder() :Builder = {
    this.builder = new Builder(a)this.builder
  }


  def stop() :Unit= {this.builder.saveOffsetrange()
  }

  class Builder {
    var topic: String= _var groupId: String= _var ssc: StreamingContext= _var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] // key-partition ID, value- offset

    def topicGroup(topic: String,groupId: String) :Builder= {this.topic = topic
      this.groupId = groupId
      this
    }

    def streamingContext(ssc: StreamingContext) :Builder = {
      this.ssc = ssc
      this
    }

    def build(fun: (DStream[ConsumerRecord[String.String]], Builder) = >Unit) :Unit= {var baseInputDStream: InputDStream[ConsumerRecord[String.String]] = null
      var offsetMap: Map[TopicPartition.Long] = RedisOffsetUtil.getOffset(topic, groupId)
      if(offsetMap ! =null && offsetMap.nonEmpty){
          baseInputDStream = OffsetKafkaUtil.getKafkaStream(topic, ssc, offsetMap, groupId)
      }else{
          baseInputDStream = OffsetKafkaUtil.getKafkaStream(topic, ssc, groupId)
      }
      val offsetDStream = filterOffsetRange(baseInputDStream);

      // Business processing
      fun(offsetDStream, this)}private def filterOffsetRange(dStream: InputDStream[ConsumerRecord[String.String]]) :DStream[ConsumerRecord[String.String]] = {
      val offsetDStream: DStream[ConsumerRecord[String.String]] = dStream.transform(rdd => {
        // KafkaRDD
        this.offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      })
      offsetDStream
    }

    def saveOffsetrange() :Unit= {RedisOffsetUtil.saveOffset(this.topic, this.groupId, this.offsetRanges)
    }
  }
}
Copy the code

OffsetUtil

trait  OffsetUtil {
  // Get the offset
  def getOffset(topicName: String, groupId: String) :Map[TopicPartition.Long]

  // Save the offset
  def saveOffset(topicName: String, groupId: String, offsetArray: Array[OffsetRange])}object OffsetRedisUtil {
	 /** 1- Store offsets for topic consumption in Redis consumer group ** Reids store format design * key: Keyword + topic + consumer group * value: Hash storage * Hash key: partition * Hash value: Offset * Key Hash Value * offset:xx_topic:xx_groupId Partition ID_01 offset * xx_topic:xx_groupId partition ID_02 offset * Offset :xx_topic:xx_groupId id_03 offset of the partition * * @param topicName topicName * @param groupId consumer group */
  override def saveOffset(topicName: String, groupId: String, offsetArray: Array[OffsetRange) :Unit = {
      val keyName: String = createKeyName(topicName, groupId)

      // 1- Fetches the latest offset for each partition to map
      val map = new util.HashMap[String.String] ()for (elem <- offsetArray) {
        map.put(elem.partition.toString, elem.untilOffset.toString)
      }

      //
      if (map.size() > 0) {JedisUtil.hmset(keyName, map)
      }
  }
  
  
  /** * 2- Gets an offset from Redis for a consumer group on a topic */
  override def getOffset(topicName: String, groupId: String) :Map[TopicPartition.Long] = {
    val keyName: String = createKeyName(topicName, groupId)
    val map:  util.HashMap[String.String] = JedisUtil.hgetAll(keyName)

    // Convert HashMap[String, String] to Map[TopicPartition, Long]
    import scala.collection.JavaConverters._
    map.asScala.map{
      case (partitionId, offset) => {
        val partition = new TopicPartition(topicName, partitionId.toInt)
        (partition, offset.toLong)
      }
    }.toMap
  }

  def  createKeyName(topicName: String, groupId: String) :String = {
     "offset" + ":" + topicName + ":"  + groupId
  }
}
Copy the code

OffsetKafkaUtil

object OffsetKafkaUtil {

  var param = collection.mutable.Map(
    "bootstrap.servers" -> "192.168.2.102:9092"."key.deserializer" -> classOf[StringDeserializer]."value.deserializer" -> classOf[StringDeserializer]."auto.offset.reset" -> "latest".//latest: indicates that the automatic reset offset is the latest offset
    "enable.auto.commit"- > (false: java.lang.Boolean) // Whether to automatically commit offsets
  )
  
  // Read data from the latest offset position
  def getKafkaStream(topic: String,ssc:StreamingContext,groupId:String) :InputDStream[ConsumerRecord[String.String]]={
    param("group.id")=groupId
     KafkaUtils.createDirectStream[String.String](
        ssc,
        LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String] (Array(topic),param)
     )
  }

  // Reads data from the specified offset position
  def getKafkaStream(topic: String,ssc:StreamingContext,offsetMap[TopicPartition.Long],groupId:String) :InputDStream[ConsumerRecord[String.String]]={
    param("group.id")=groupId
    KafkaUtils.createDirectStream[String.String](
        ssc,
        LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String] (Array(topic),param,offsetMap)
    )
  }
}
Copy the code

other

pom.xml

 	   <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>The spark - core_2. 11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>The spark - streaming_2. 11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>Spark - streaming - kafka - 0-10 _2. 11</artifactId>
            <version>2.4.0</version>
        </dependency>

		<dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
Copy the code