Introduction to the
- SparkStreaming consumption Kafka achieves precise one-time consumption. Ensure that messages are not lost and reused.
Semantics of message processing
At Least Once
- The message is processed at least once
- Data loss can be guaranteed, but data duplication may exist.
At Most Once
- Messages are processed at most once
- Data duplication can be ensured, but data loss may occur.
2. What Exactly is the matter with you Once?
- The message is processed just once
- It is not really possible to process a message once, it is possible to do so
Message reliability
andIdempotency of messages
That is, there is no problem of data duplication and data loss for upstream and downstream systems - In fact through
At Least Once
+Idempotent processing
To achieveExactly Once semantic
Process and problems of consuming data
The default Kafka consumption is auto-commit offsets (default auto-commit every 5 seconds), so two things can happen
tip
Offsets are records of where each consumer consumes each partition (queue) and are stored in the consumer_offsets theme of Kafka
Case 1: The offset is committed before the message is processed
- If after submitting the offset first, the process hangs in the process of processing the data and preparing to unload the disk. However, if the offset is committed, the next consumption will start from the latest offset location, so the data that did not fall before will be lost.
Case 2. After the message is processed, the offset is committed
- If, after reprocessing the message, the process hangs and cannot commit the offset of the latest consumption, then the next consumption will continue from the old offset location, which may result in repeated consumption of data
It can be found that consuming a message has two steps to process the message and commit offsets, and we cannot guarantee the atomicity of these two steps, i.e., simultaneous success or failure, which may result in data loss or repeated consumption
To realize Exactly Once
Method 1: Use transactions
- The key to implementing Exactly Once semantics is assurance
Process the message
andCommit offset
Student: Atomicity of theta. - So by putting these two operations into one transaction, either processing the message first or committing the offset first can ensure that the message is not lost or duplicated
implementation
- For example, manually maintain the offset of the consumption message and put the offset into MySQL, and then put the data drop into MySQL, which supports transactions, so we can guarantee the atomicity of the two operations.
Disadvantages:
- There are dependencies on storage tiers and only transaction-based storage tiers can be used
- Transaction performance is not high
- In addition, a storage layer is prone to single point of failure and the pressure is too high. If the storage layer is distributed, it needs to do distributed transactions, which increases the complexity
Method 2: Manually commit the offset + idempotency
- Ensure that the data is processed before submitting the offset, but it may fail to submit the offset and lead to repeated consumption. In this case, the idempotent saving of data is needed, that is, no matter how many times the data is saved, the effect is the same and there is no duplicate data.
Idempotent preservation implementation
- Some storage tiers themselves support idempotent operations, such as MySQL’s main keyboard, and unique indexes. If the same ID is inserted once, it is the same as if it were inserted 100 times. And Eleaticsearch’s primary key ID also naturally supports idempotent operations (again overridden). Redis, too, has far more storage layers that support idempotent operations than transactions, and performs better
- If you use a storage tier that does not support idempotent operations, you may need to manually implement idempotent operations or de-repetitions yourself.
Pseudo-code implementation:
object Test {
case class UserLog(id:Int, name:String){}
def main(args: Array[String) :Unit = {
/** * 1, SparkStreaming is enabled */
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
Select * from Redis where groupId = offset */
val topic = "topic-log"
val groupId = "consumer-007"
/** ** ** */
ExactOneUtil.Builder().streamingContext(ssc).topicGroup(topic, groupId).build(businessProcessing)
ssc.start()
ssc.awaitTermination()
}
/** Business processing */
def businessProcessing(offsetDStream: DStream[ConsumerRecord[String.String]], builder: Builder) :Unit = {
// Get the Dstream
val jsonObjectDStream: DStream[JSONObject]= offsetDStream.map(msg => {
val jsonObj: JSONObject = JSON.parseObject(msg.value())
/ /...
jsonObj
})
jsonObjectDStream.foreachRDD(rdd => {
rdd.foreachPartition(jsonObjList => {
// If id is the primary key, it is naturally idempotent, no matter how many times it is saved
val resultData: Iterator[UserLog] = jsonObjList.map { obj => {
UserLog(obj.getIntValue("id"), obj.getString("nama"))}}// Drop the result reasultDat
resultData.toList
/ /... save to MySQL or Es or Redis
})
// Commit offset after processing a batch of RDD data and ensuring that the disk falls
builder.saveOffsetrange()
})
}
Copy the code
ExactOneUtil
object ExactOneUtil {
var builder: Builder= _def Builder() :Builder = {
this.builder = new Builder(a)this.builder
}
def stop() :Unit= {this.builder.saveOffsetrange()
}
class Builder {
var topic: String= _var groupId: String= _var ssc: StreamingContext= _var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] // key-partition ID, value- offset
def topicGroup(topic: String,groupId: String) :Builder= {this.topic = topic
this.groupId = groupId
this
}
def streamingContext(ssc: StreamingContext) :Builder = {
this.ssc = ssc
this
}
def build(fun: (DStream[ConsumerRecord[String.String]], Builder) = >Unit) :Unit= {var baseInputDStream: InputDStream[ConsumerRecord[String.String]] = null
var offsetMap: Map[TopicPartition.Long] = RedisOffsetUtil.getOffset(topic, groupId)
if(offsetMap ! =null && offsetMap.nonEmpty){
baseInputDStream = OffsetKafkaUtil.getKafkaStream(topic, ssc, offsetMap, groupId)
}else{
baseInputDStream = OffsetKafkaUtil.getKafkaStream(topic, ssc, groupId)
}
val offsetDStream = filterOffsetRange(baseInputDStream);
// Business processing
fun(offsetDStream, this)}private def filterOffsetRange(dStream: InputDStream[ConsumerRecord[String.String]]) :DStream[ConsumerRecord[String.String]] = {
val offsetDStream: DStream[ConsumerRecord[String.String]] = dStream.transform(rdd => {
// KafkaRDD
this.offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
})
offsetDStream
}
def saveOffsetrange() :Unit= {RedisOffsetUtil.saveOffset(this.topic, this.groupId, this.offsetRanges)
}
}
}
Copy the code
OffsetUtil
trait OffsetUtil {
// Get the offset
def getOffset(topicName: String, groupId: String) :Map[TopicPartition.Long]
// Save the offset
def saveOffset(topicName: String, groupId: String, offsetArray: Array[OffsetRange])}object OffsetRedisUtil {
/** 1- Store offsets for topic consumption in Redis consumer group ** Reids store format design * key: Keyword + topic + consumer group * value: Hash storage * Hash key: partition * Hash value: Offset * Key Hash Value * offset:xx_topic:xx_groupId Partition ID_01 offset * xx_topic:xx_groupId partition ID_02 offset * Offset :xx_topic:xx_groupId id_03 offset of the partition * * @param topicName topicName * @param groupId consumer group */
override def saveOffset(topicName: String, groupId: String, offsetArray: Array[OffsetRange) :Unit = {
val keyName: String = createKeyName(topicName, groupId)
// 1- Fetches the latest offset for each partition to map
val map = new util.HashMap[String.String] ()for (elem <- offsetArray) {
map.put(elem.partition.toString, elem.untilOffset.toString)
}
//
if (map.size() > 0) {JedisUtil.hmset(keyName, map)
}
}
/** * 2- Gets an offset from Redis for a consumer group on a topic */
override def getOffset(topicName: String, groupId: String) :Map[TopicPartition.Long] = {
val keyName: String = createKeyName(topicName, groupId)
val map: util.HashMap[String.String] = JedisUtil.hgetAll(keyName)
// Convert HashMap[String, String] to Map[TopicPartition, Long]
import scala.collection.JavaConverters._
map.asScala.map{
case (partitionId, offset) => {
val partition = new TopicPartition(topicName, partitionId.toInt)
(partition, offset.toLong)
}
}.toMap
}
def createKeyName(topicName: String, groupId: String) :String = {
"offset" + ":" + topicName + ":" + groupId
}
}
Copy the code
OffsetKafkaUtil
object OffsetKafkaUtil {
var param = collection.mutable.Map(
"bootstrap.servers" -> "192.168.2.102:9092"."key.deserializer" -> classOf[StringDeserializer]."value.deserializer" -> classOf[StringDeserializer]."auto.offset.reset" -> "latest".//latest: indicates that the automatic reset offset is the latest offset
"enable.auto.commit"- > (false: java.lang.Boolean) // Whether to automatically commit offsets
)
// Read data from the latest offset position
def getKafkaStream(topic: String,ssc:StreamingContext,groupId:String) :InputDStream[ConsumerRecord[String.String]]={
param("group.id")=groupId
KafkaUtils.createDirectStream[String.String](
ssc,
LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String] (Array(topic),param)
)
}
// Reads data from the specified offset position
def getKafkaStream(topic: String,ssc:StreamingContext,offsetMap[TopicPartition.Long],groupId:String) :InputDStream[ConsumerRecord[String.String]]={
param("group.id")=groupId
KafkaUtils.createDirectStream[String.String](
ssc,
LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String] (Array(topic),param,offsetMap)
)
}
}
Copy the code
other
pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - core_2. 11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - streaming_2. 11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>Spark - streaming - kafka - 0-10 _2. 11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
Copy the code