This set of technical column is the author (Qin Kaixin) at ordinary times the summary of the work and sublimation, and the depth of a large number of online resources and professional books. This blog will summarize and share cases extracted from real business environments, and provide tuning suggestions for business applications and capacity planning for cluster environments. Please continue to follow this blog. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

1 Kafka offset

1.1 Kafka prior to 0.9

The offsets here refer to kafka consumer offsets. Before version 0.9 kafka consumer offsets were saved in ZooKeeper by default (/consumers/<group.id>/offsets//), Therefore, zookeeper.hosts needs to be specified during consumer initialization.

1.2 Kafka 0.9 later

As Kafka Consumer continued to be used in real-world scenarios, the community found that it was inappropriate for older versions of Consumer to commit shifts to ZooKeeper. ZooKeeper is essentially a coordination service component, which is not suitable as a storage component for displacement information. After all, frequent and concurrent read/write operations are not what ZooKeeper is good at. So starting with version 0.9, consumer submits shifts to an internal topic in Kafka (__consumer_offsets), which by default has 50 partitions with three copies per partition.

1.3 Message processing semantics

  • At-most-once: a message may be lost but will not be processed again.
  • At-least-once: a message is not lost but may be processed more than once.
  • A message must be processed and only processed once.
  • If the consumer commits the shift before consuming the message, then it can achieve at-most once, because if the consumer crashes between the commit shift and the message consumption, the consumer will restart and consume from the new offset, and the previous message will be lost. On the contrary,
  • At-least-once semantics can be implemented if the commit shift is after the message consumption. Because Kafka has no way of ensuring that a successful message processing is completed in the same transaction as the shift commit, if the message consumption is successful, the shift is committed, but the processing fails, Kafka provides at-least-once processing semantics by default.

1.4 Kafka offset Submission Mode

  • By default, the consumer commits automatically with an interval of 5 seconds, which can be controlled by setting the auto.mit.interval. ms parameter.

    The advantage of automatic displacement submission is that the development cost of users is reduced and users do not have to deal with the displacement submission by themselves. The disadvantage is that users are not able to fine-grained work on displacement submissions, especially when there is a strong semantics of precise one-time processing (in this case, users can use manual displacement submissions).

  • Manual shift submission is when the user determines when the message is actually processed and can submit the shift. The user can ensure that only the message is actually processed before submitting the shift. This timing cannot be guaranteed with automatic shift commit, so manual shift commit must be used in this case.

    Setting up to use the manual commit shift is simple, just set enable.auto.mit =false when KafkaConsumer is built and then call the commitSync or commitAsync methods.

2 Spark shift processing method

2.1 Auto-.offset. Reset Setting Procedure

  • For auto-.offset. reset, I recommend setting it to “earliest”. When first run, __consumer_offsets has no offset information, so the message is read from the original. When run the second time, since __consumer_offsets already has offset information for consumption, the data is read based on the offset information recorded in __consumer_offsets.

  • In addition, for zooKeeper offset management, only need to delete the corresponding node, data can be read from scratch, which is very convenient. However, if you want to read data from the latest place and don’t need to read old messages, you can set it to Latest.

    Latest: latest: commits a partition, reads from Offset, does not read from the latest data None: If offset is not committed, an error is reported. If offset has been committed, it is read from the offsetCopy the code

2.2 Subscribe to the Kafka theme

  • Based on regular subscription topics, there are the following benefits:

    You don't need to list topic names, one or two topics are fine, if there are dozens, listing is too much trouble; The effect of dynamic subscription can be achieved (newly added reged-matching topics are also read). stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))Copy the code
  • Assign partitioning strategies. LocationStrategies: Create consumers based on a given topic and cluster address

    Create the DStream, return to receive input data LocationStrategies. PreferConsistent: Continuous uniform distribution between all Executor partition (uniform distribution, the selected every Executor distribution partition) LocationStrategies. PreferBrokers: If executor and Kafka Brokers are on the same machine, select that executor. LocationStrategies. PreferFixed: if the machine is not uniform, you can specify a special hosts. , of course, if you do not specify, using LocationStrategies PreferConsistent modeCopy the code
  • SparkStreaming serialization problem

    Variables or objects used in the driver do not need to be serialized; variables or objects passed to exector need to be serialized. Therefore, it is recommended that only data transformation be handled in exector and that the results of the processing be stored in the driver.

    Stream.foreachrdd (RDD => {val offsetRanges = rdd.asinstanceof [HasOffsetRanges].offsetranges Kafkaoffset. updateOffset(offsetRanges) // Exector code run area val resultRDD = rdd.map(XXXXXXXX) //endregion // Stores the result resultRDD.saveToES(xxxxxx) kafkaOffset.commitOffset(offsetRanges) })Copy the code

2.3 Manually manage shift code analysis using old-fashioned ZooKeeper

  • Zookeeper offset management ZkKafkaOffset implementation, the Zookeeper management tool can be used to modify and delete any node information, if you want to read messages from the beginning, you only need to delete a node zK data.

    import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.OffsetRange import scala.collection.JavaConverters._ class ZkKafkaOffset(getClient: () => ZkClient, getZkRoot : () => String) {// Define lazy to implement lazy singleton mode, solve serialization problem, convenient use of broadcast lazy val zkClient: ZkClient = getClient() lazy val zkRoot: String = getZkRoot() // offsetId = MD5 (groupId+ Join (topics)) // zkRoot def initOffset(offsetId: String) : Unit = { if(! ZkClient. The exists (zkRoot)) {zkClient. CreatePersistent (zkRoot, true)}} / / read the offset from the zkRoot information def getOffset () : Map[TopicPartition, Long] = { val keys = zkClient.getChildren(zkRoot) var initOffsetMap: Map[TopicPartition, Long] = Map() if(! keys.isEmpty){ for (k:String <- keys.asScala) { val ks = k.split("!" ) val value:Long = zkClient.readData(zkRoot + "/" + k) initOffsetMap += (new TopicPartition(ks(0), Integer.parseint (ks(1)) -> value)}} initOffsetMap def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean = { val path = zkRoot + "/" + consumeRecord.topic + "!" + consumerord.partition zkClient.writeData(path, consumerord.offset ()) true} Def updateOffset(Array[OffsetRange]): Boolean = {for (offset: OffsetRange <- offsetRanges) { val path = zkRoot + "/" + offset.topic + "!" + offset.partition if(! zkClient.exists(path)){ zkClient.createPersistent(path, offset.fromOffset) } else{ zkClient.writeData(path, Def commitOffset(Array[OffsetRange]): commitOffset(Array[OffsetRange]): Boolean = { for (offset: OffsetRange <- offsetRanges) { val path = zkRoot + "/" + offset.topic + "!" + offset.partition if(! zkClient.exists(path)){ zkClient.createPersistent(path, offset.untilOffset) } else{ zkClient.writeData(path, offset.untilOffset) } } true } def finalize(): Unit = { zkClient.close() } } object ZkKafkaOffset{ def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset = {val getClient = () =>{val zkHost = cong. Get ("kafka.zk.hosts", "127.0.0.1:2181") new ZkClient(zkHost, 30000) } val getZkRoot = () =>{ val zkRoot = "/kafka/ss/offset/" + offsetId zkRoot } new ZkKafkaOffset(getClient, getZkRoot) } }Copy the code
  • Spark Streaming consumption Kafka messages

    Step 1: val customOffset: Map[TopicPartition, Long] = kafkaoffset.getoffSet (SSC) stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies. Subscribe [String, String] (switchable viewer, kafkaConf customOffset)), the third step: After processing, kafkaOffset.commitOffset(offsetRanges) import scala.collection.JavaConverters._ object RtDataLoader { def main(args: Array[String]): Unit = {val props = new props ("xxx.properties") val groupId = props. GetStr ("groupId", "") if(StrUtil.isBlank(groupId)){ StaticLog.error("groupId is empty") return } val kfkServers = props.getStr("kfk_servers") if(StrUtil.isBlank(kfkServers)){ StaticLog.error("bootstrap.servers is empty") return } val topicStr = props.getStr("topics") if(StrUtil.isBlank(kfkServers)){ StaticLog.error("topics is empty") return } // KAFKA Split (",") val kafkaConf = Map[String, Object]("bootstrap.servers" -> kfkServers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "receive.buffer.bytes" -> (102400: java.lang.Integer), "max.partition.fetch.bytes" -> (5252880: java.lang.Integer), "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val conf = new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master", "Local [2]") / / streaming related configuration conf. Set (" spark. Streaming. StopGracefullyOnShutdown ", "true") conf.set("spark.streaming.backpressure.enabled","true") conf.set("spark.streaming.backpressure.initialRate","1000") // Set ("kafka.zk.hosts", props. GetStr ("zk_hosts", // Create StreamingContext val sc = new SparkContext(conf) sc.setLogLevel("WARN") val SSC = new StreamingContext(sc, Seconds(5)) // Offset val offsetId = SecureUtil. Md5 (groupId + topics kafkaOffset = ZkKafkaOffset(ssc.sparkContext.getConf, offsetId) kafkaOffset.initOffset(ssc, offsetId) val customOffset: Map[TopicPartition, Long] = kafkaoffset.getoffSet (SSC) var stream:InputDStream[String, String]] = null if(Topicstr. contains("*")) {staticlog.warn (" Use regular match to read kafka theme: " + topicStr) stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, CustomOffset))} else {staticlog.warn (" Kafka theme to read: " + topicStr) stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, CustomOffset))} // Consume data stream.foreachRDD(RDD => { Val offsetRanges = rdd.asinstanceof [HasOffsetRanges].offsetranges kafkaOffset. UpdateOffset (offsetRanges) Staticlog. info(" Start processing RDD data!" KafkaOffset.com mitOffset(offsetRanges)}) ssc.start() ssc.awaittermination ()}}Copy the code

3. Flink displacement processing method

3.1 Flink consumers are accurate to semantics once

  • SetStartFromGroupOffsets () [Default consumption policy] Reads the last saved offset information by default. If the last offset information cannot be read after the application is started for the first time, the data will be consumed according to the value of auto-.offset. Reset

  • SetStartFromEarliest () starts consumption from the earliest data, ignoring the stored offset information

  • SetStartFromLatest () consumes from the latest data, ignoring the stored offset information

  • SetStartFromSpecificOffsets (Map < KafkaTopicPartition, Long >) from the specified location.

  • When checkpoint is enabled, KafkaConsumer periodically stores the Kafka offset information along with the status information of other operators. When a job fails and restarts, Flink recovers data from the last checkpoint and consumes data in Kafka again.

  • Checkpoint env.enablecheckPointing (5000); // Checkpoint every 5s

  • Kafka Consumers Offset auto-commit has the following two methods, which can be differentiated according to whether job checkpoint is enabled:

    (1) When Flink Checkpoint is off: Kafka can be configured using the following two Properties parameters

      enable.auto.commit
      auto.commit.interval.ms
    Copy the code

    (2) Checkpoint enabled: Offset is saved only when Checkpoint is executed. This ensures that kafka’s offset is consistent with its Checkpoint status offset. You can set it with this parameter

      setCommitOffsetsOnCheckpoints(boolean)
    Copy the code

    This parameter defaults to true. The offset is committed at checkpoint, at which point the automatic commit mechanism in Kafka is ignored.

    / / get Flink running environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); / / checkpoint configuration env. EnableCheckpointing (5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION); / / set statebackend env. SetStateBackend (new RocksDBStateBackend (" HDFS: / / hadoop100:9000 / flink/checkpoints ", true)); String topic = "kafkaConsumer"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","SparkMaster:9092"); prop.setProperty("group.id","kafkaConsumerGroup"); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets(); / / the default consumption strategy myConsumer. SetCommitOffsetsOnCheckpoints (true); DataStreamSource<String> text = env.addSource(myConsumer); text.print().setParallelism(1); env.execute("StreamingFromCollection");Copy the code
  • Flink KafkaConsumer allows you to configure the behavior of submitting offsets to Kafka Brokers (or Zookeeper). Note that Flink Kafka Consumer does not rely on these offsets submitted back to Kafka or Zookeeper for fault tolerance. These submitted offsets simply mean that Flink exposes the state of consumption for easy monitoring.

  • FlinkKafkaConsumer provides a robust mechanism to ensure the exact-once consumption of Kafka data under high throughput conditions, and its API is easy to use and configure, as well as easy to monitor.

  • A barrier is a separator between checkpoints. Data before it belongs to the previous checkpoint, and data after it belongs to another checkpoint. At the same time, the barrier is initiated by the source(such as FlinkKafkaConsumer), mixed with data, and transmitted to the next level operator as data until it sinks. If the barrier is received by sink, the checkpoint is completed (the checkpoint state is completed and is stored in State Backend), and the previous data is processed and sink.

  • Flink’s asynchronous checkpoint recording behavior is configured by us, and only when we set enableCheckpointing(), A checkpoint is completed only when a checkpoint barrier is received by all job operators. The offset is logged and committed so that exactly-once is guaranteed.

3.2 Flink producers are accurate to a semantics

  • Kafka Producer fault tolerance -Kafka 0.9 and 0.10

    FlinkKafkaProducer09 and FlinkKafkaProducer010 provide at-least-once semantics. FlinkKafkaProducer09 and FlinkKafkaProducer010 provide at-least-once semantics. SetLogFailuresOnly (false) setFlushOnCheckpoint(true) Note: It is recommended to change the number of retries of the kafka producer.Copy the code
  • Kafka 0.11 is fault-tolerant of KafkaProducer. If Flink checkpoint is enabled, it can provide exactly-once semantics for FlinkKafkaProducer011, but specific semantics need to be selected

    Specific Semantic set mode Semantic. NONE Semantic. AT_LEAST_ONCE Semantic. The default 】 【 EXACTLY_ONCE checkpoint configuration StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION); // Prop.setProperty ("transaction.timeout.ms",60000*15+""); The second solution is to set the maximum transaction timeout for Kafka, mainly kafka configuration file Settings. //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema()); Kafkaproducer011 <String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); text.addSink(myProducer);Copy the code

4 summarizes

This set of technical column is the author (Qin Kaixin) at ordinary times the summary of the work and sublimation, and the depth of a large number of online resources and professional books. This blog will summarize and share cases extracted from real business environments, and provide tuning suggestions for business applications and capacity planning for cluster environments. Please continue to follow this blog. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

This article has puzzled me a lot about how Kafka offset management implements precise first-order semantics. If you are lucky enough to read this article and have the best solution, please leave me a comment for a better solution.

Qin Kai is new to Inspur