The Spark Streaming No Receivers createDirectStream method does not use Receivers. Instead, it creates an input stream that pulls messages directly from the Kafka cluster nodes. The input stream ensures that each message is fully transformed only once after being pulled from the Kafka cluster, ensuring semantic consistency. However, in the event of a job failure or restart, to ensure that data is processed from the current consumption point (i.e. Exactly Once semantics), relying solely on SparkStreaming itself is not ideal. Production environments usually maintain Kafka consumption points by manually managing offsets. This article will show you how to manually manage Kafka’s Offset. This paper mainly includes the following contents:
- How to use MySQL to manage Kafka Offset
- How to use Redis to manage Kafka offsets
How to use MySQL to manage Kafka Offset
We can write code from the Spark Streaming application to manually manage the Kafka offset, which can be obtained from the RDDS offset generated in each batch of stream processing as follows:
KafkaUtils.createDirectStream(...) .foreachRDD { rdd =>// Get the offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
Copy the code
After obtaining the offset, you can save it to an external storage device (such as MySQL, Redis, Zookeeper, and HBase).
Use case code
- Table used in MySQL to hold offsets
CREATE TABLE `topic_par_group_offset` (
`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL.PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
Copy the code
- Constant configuration class :ConfigConstants
object ConfigConstants {
/ / Kafka configuration
val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
val groupId = "group_test"
val kafkaTopics = "test"
val batchInterval = Seconds(5)
val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
val batchSize = 16384
val lingerMs = 1
val bufferMemory = 33554432
/ / MySQL configuration
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/kafka_offset"
val driver = "com.mysql.jdbc.Driver"
// Checkpoint configuration
val checkpointDir = "file:///e:/checkpoint"
val checkpointInterval = Seconds(10)
/ / Redis configuration
val redisAddress = "192.168.10.203"
val redisPort = 6379
val redisAuth = "123qwe"
val redisTimeout = 3000
}
Copy the code
- JDBC connection utility class :JDBCConnPool
object JDBCConnPool {
val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
var dataSource: BasicDataSource = null
/** * create data source ** @return */
def getDataSource() :BasicDataSource = {
if (dataSource == null) {
dataSource = new BasicDataSource()
dataSource.setDriverClassName(ConfigConstants.driver)
dataSource.setUrl(ConfigConstants.url)
dataSource.setUsername(ConfigConstants.user)
dataSource.setPassword(ConfigConstants.password)
dataSource.setMaxTotal(50)
dataSource.setInitialSize(3)
dataSource.setMinIdle(3)
dataSource.setMaxIdle(10)
dataSource.setMaxWaitMillis(2 * 10000)
dataSource.setRemoveAbandonedTimeout(180)
dataSource.setRemoveAbandonedOnBorrow(true)
dataSource.setRemoveAbandonedOnMaintenance(true)
dataSource.setTestOnReturn(true)
dataSource.setTestOnBorrow(true)}return dataSource
}
/** * release data source */
def closeDataSource() = {
if(dataSource ! =null) {
dataSource.close()
}
}
/** * get database connection ** @return */
def getConnection() :Connection = {
var conn: Connection = null
try {
if(dataSource ! =null) {
conn = dataSource.getConnection()
} else {
conn = getDataSource().getConnection()
}
} catch {
case e: Exception =>
log.error(e.getMessage(), e)
}
conn
}
/** * close the connection */
def closeConnection (ps:PreparedStatement , conn:Connection ) {
if(ps ! =null) {
try {
ps.close();
} catch {
case e:Exception =>
log.error("Prepare SQL statement object PreparedStatement close exception!"+ e.getMessage(), e); }}if(conn ! =null) {
try {
conn.close();
} catch {
case e:Exception =>
log.error("Closed Connection object Connection exception!"+ e.getMessage(), e); }}}}Copy the code
- Kafka producer: KafkaProducerTest
object KafkaProducerTest {
def main(args: Array[String) :Unit = {
val props : Properties = new Properties()
props.put("bootstrap.servers".ConfigConstants.kafkaBrokers)
props.put("batch.size".ConfigConstants.batchSize.asInstanceOf[Integer])
props.put("linger.ms".ConfigConstants.lingerMs.asInstanceOf[Integer])
props.put("buffer.memory".ConfigConstants.bufferMemory.asInstanceOf[Integer])
props.put("key.serializer".ConfigConstants.kafkaKeySer)
props.put("value.serializer".ConfigConstants.kafkaValueSer)
val producer : Producer[String.String] = new KafkaProducer[String.String](props)
val startTime : Long = System.currentTimeMillis()
for ( i <- 1 to 100) {
producer.send(new ProducerRecord[String.String] (ConfigConstants.kafkaTopics, "Spark".Integer.toString(i)))
}
println("Consumption of time:" + (System.currentTimeMillis() - startTime))
producer.close()
}
}
Copy the code
- Read and save Offset:
This object reads and writes offsets from external devices, including MySQL and Redis
object OffsetReadAndSave {
** @param groupid * @param topic * @return */
def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition.Long] = {
val conn = JDBCConnPool.getConnection()
val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
val ppst = conn.prepareStatement(selectSql)
ppst.setString(1, groupid)
ppst.setString(2, topic)
val result: ResultSet = ppst.executeQuery()
// Topic partition offset
val topicPartitionOffset = mutable.Map[TopicPartition.Long] ()while (result.next()) {
val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))
topicPartitionOffset += (topicPartition -> result.getLong("offset"))}JDBCConnPool.closeConnection(ppst, conn)
topicPartitionOffset
}
/** * get offset from Redis ** @param groupid * @param topic * @return */
def getOffsetFromRedis(groupid: String, topic: String) :Map[TopicPartition.Long] = {
val jedis: Jedis = JedisConnPool.getConnection()
var offsets = mutable.Map[TopicPartition.Long] ()val key = s"${topic}_${groupid}"
val fields : java.util.Map[String.String] = jedis.hgetAll(key)
for (partition <- JavaConversions.mapAsScalaMap(fields)) {
offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
}
offsets.toMap
}
/** * write the offset to MySQL ** @param groupid consumer groupid * @param offsetRange message offsetRange */
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val conn = JDBCConnPool.getConnection()
val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(? ,? ,? ,?) "
val ppst = conn.prepareStatement(insertSql)
for (offset <- offsetRange) {
ppst.setString(1, offset.topic)
ppst.setInt(2, offset.partition)
ppst.setString(3, groupid)
ppst.setLong(4, offset.untilOffset)
ppst.executeUpdate()
}
JDBCConnPool.closeConnection(ppst, conn)
}
/** * Save the offset to Redis * @param groupid * @param offsetRange */
def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
val jedis :Jedis = JedisConnPool.getConnection()
for(offsetRange<-offsetRange){
val topic=offsetRange.topic
val partition=offsetRange.partition
val offset=offsetRange.untilOffset
// Key is topic_groupid,field is partition, and value is offset
jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
}
}
}
Copy the code
- Business processing class
This object is the business processing logic that consumes Kafka data and then manually saves the offset to MySQL. When the program is started, it determines whether there is an Offset in the external storage device. If it is started for the first time, it is consumed from the original consumption point, and if Offset exists, it is consumed from the current Offset.
Observation: Data is consumed from scratch when it is first started, stop the program manually, and then start again, and you will find that data is consumed from the offset currently committed.
object ManualCommitOffset {
def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer",ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
//Must open the checkpoint, or complains SSC. Checkpoint (ConfigConstants. CheckpointDir) SSC. SparkContext. SetLogLevel (" OFF ")//Create a direct kafka stream val topicSet using broker and topic= topics.split(" ").toSet
//The kafka connection parameter val kafkaParams= Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
//Read val offsetMap, the partition offset of the consumer group corresponding to this topic, from MySQL= OffsetReadAndSave.getOffsetMap(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
//If an offset already exists in MySQL, you should start consuming if (offsetmap.size) at that offset> 0Println (" there is an offset, consume from that offset!!" ) inputDStream= KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
//If no offset exists in MySQL, inputDStream is consumed from the earliest= KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
//Checkpoint interval, must be batchInterval integer times inputDStream. Checkpoint (ConfigConstants. CheckpointInterval)//Save the batch ofoffset
var offsetRanges = Array[OffsetRange]()
//Gets the message offset of the current DS val transformDS= inputDStream.transform { rdd =>
//To obtainoffset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/** * status update function * @param newValues: new value * @param stateValue: status value * @return */
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
var oldvalue = stateValue.getOrElse(0) //Get status value//Iterate over the current data and update the statusfor (newValue <- newValues) {
oldvalue += newValue
}
//Return the latest state Option(oldValue)}//Business logic processing//This example counts the number of message keys to see if data transformds.map (Meg) is consumed from an offset that has been committed=> ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
//Print the offset and data information and observe the output. Transformds. foreachRDD {(RDD,time) =>
//RDD. Foreach {record=>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
//Print the consumption offset informationfor (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
//To save the offset to the MySQL OffsetReadAndSave. SaveOffsetRanges (groupId, offsetRanges)} SSC. Start (SSC). The awaitTermination ()}}Copy the code
How to use Redis to manage Kafka offsets
- Redis connection class
object JedisConnPool {
val config = new JedisPoolConfig
// Maximum number of connections
config.setMaxTotal(60)
// Maximum number of idle connections
config.setMaxIdle(10)
config.setTestOnBorrow(true)
// Server IP address
val redisAddress :String = ConfigConstants.redisAddress.toString
/ / the port number
val redisPort:Int = ConfigConstants.redisPort.toInt
// Access password
val redisAuth :String = ConfigConstants.redisAuth.toString
// The maximum time to wait for available connections
val redisTimeout:Int = ConfigConstants.redisTimeout.toInt
val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)
def getConnection() :Jedis = {
pool.getResource
}
}
Copy the code
- Business logic processing
This object is basically similar to the above, but uses Redis to store Offset. The data type stored in Redis is Hash, and the basic format is: [key field value] -> [topic_groupid partition offset], that is, key is topic_groupid,field is partition, and value is offset.
object ManualCommitOffsetToRedis {
def main(args: Array[String) :Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer".ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// Checkpoint must be enabled, otherwise an error will be reported
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
Create a direct kafka stream using broker and topic
val topicSet = topics.split("").toSet
// Kafka connection parameters
val kafkaParams = Map[String.Object] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG- > (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// Read the partitioning offset of the consumer group corresponding to this topic from Redis
val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String.String]] = null
// If an offset already exists in Redis, consumption should start at that offset
if (offsetMap.size > 0) {
println("There is an offset, consume from that offset!!")
inputDStream = KafkaUtils.createDirectStream[String.String](
ssc,
LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String](topicSet, kafkaParams, offsetMap))
} else {
// If there is no offset in Redis, start consuming from the earliest
inputDStream = KafkaUtils.createDirectStream[String.String](
ssc,
LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String](topicSet, kafkaParams))
}
// The checkpoint interval must be an integer multiple of batchInterval
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// Save the batch offset
var offsetRanges = Array[OffsetRange] ()// Get the message offset of the current DS
val transformDS = inputDStream.transform { rdd =>
/ / for offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/** * status update function ** @param newValues: new value * @param stateValue: status value * @return */
def updateFunc(newValues: Seq[Int], stateValue: Option[Int) :Option[Int] = {
var oldvalue = stateValue.getOrElse(0) // Get the status value
// Iterate over the current data and update the status
for (newValue <- newValues) {
oldvalue += newValue
}
// Returns the latest status
Option(oldvalue)
}
// Business logic processing
// This example counts the number of message keys to see if data is being consumed from committed offsets
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// Print the offset and data information and observe the output result
transformDS.foreachRDD { (rdd, time) =>
// Print the RDD data iteratively
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// Prints the consumption offset information
for (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}// Save the offset to Redis
OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
Copy the code
conclusion
This article explained how to use an external storage device to store consumption points in Kafka, and detailed code examples showed how to use MySQL and Redis to manage consumption points. Of course, there are many external storage devices. Users can also use other storage devices to manage offsets, such as Zookeeper and HBase. The basic processing roadmap is similar.
The Spark Streaming No Receivers createDirectStream method does not use Receivers. Instead, it creates an input stream that pulls messages directly from the Kafka cluster nodes. The input stream ensures that each message is fully transformed only once after being pulled from the Kafka cluster, ensuring semantic consistency. However, in the event of a job failure or restart, to ensure that data is processed from the current consumption point (i.e. Exactly Once semantics), relying solely on SparkStreaming itself is not ideal. Production environments usually maintain Kafka consumption points by manually managing offsets. This article will show you how to manually manage Kafka’s Offset. This paper mainly includes the following contents:
- How to use MySQL to manage Kafka Offset
- How to use Redis to manage Kafka offsets
How to use MySQL to manage Kafka Offset
We can write code from the Spark Streaming application to manually manage the Kafka offset, which can be obtained from the RDDS offset generated in each batch of stream processing as follows:
KafkaUtils.createDirectStream(...) .foreachRDD { rdd =>// Get the offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
Copy the code
After obtaining the offset, you can save it to an external storage device (such as MySQL, Redis, Zookeeper, and HBase).
Use case code
- Table used in MySQL to hold offsets
CREATE TABLE `topic_par_group_offset` (
`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL.PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
Copy the code
- Constant configuration class :ConfigConstants
object ConfigConstants {
/ / Kafka configuration
val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
val groupId = "group_test"
val kafkaTopics = "test"
val batchInterval = Seconds(5)
val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
val batchSize = 16384
val lingerMs = 1
val bufferMemory = 33554432
/ / MySQL configuration
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/kafka_offset"
val driver = "com.mysql.jdbc.Driver"
// Checkpoint configuration
val checkpointDir = "file:///e:/checkpoint"
val checkpointInterval = Seconds(10)
/ / Redis configuration
val redisAddress = "192.168.10.203"
val redisPort = 6379
val redisAuth = "123qwe"
val redisTimeout = 3000
}
Copy the code
- JDBC connection utility class :JDBCConnPool
object JDBCConnPool {
val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
var dataSource: BasicDataSource = null
/** * create data source ** @return */
def getDataSource() :BasicDataSource = {
if (dataSource == null) {
dataSource = new BasicDataSource()
dataSource.setDriverClassName(ConfigConstants.driver)
dataSource.setUrl(ConfigConstants.url)
dataSource.setUsername(ConfigConstants.user)
dataSource.setPassword(ConfigConstants.password)
dataSource.setMaxTotal(50)
dataSource.setInitialSize(3)
dataSource.setMinIdle(3)
dataSource.setMaxIdle(10)
dataSource.setMaxWaitMillis(2 * 10000)
dataSource.setRemoveAbandonedTimeout(180)
dataSource.setRemoveAbandonedOnBorrow(true)
dataSource.setRemoveAbandonedOnMaintenance(true)
dataSource.setTestOnReturn(true)
dataSource.setTestOnBorrow(true)}return dataSource
}
/** * release data source */
def closeDataSource() = {
if(dataSource ! =null) {
dataSource.close()
}
}
/** * get database connection ** @return */
def getConnection() :Connection = {
var conn: Connection = null
try {
if(dataSource ! =null) {
conn = dataSource.getConnection()
} else {
conn = getDataSource().getConnection()
}
} catch {
case e: Exception =>
log.error(e.getMessage(), e)
}
conn
}
/** * close the connection */
def closeConnection (ps:PreparedStatement , conn:Connection ) {
if(ps ! =null) {
try {
ps.close();
} catch {
case e:Exception =>
log.error("Prepare SQL statement object PreparedStatement close exception!"+ e.getMessage(), e); }}if(conn ! =null) {
try {
conn.close();
} catch {
case e:Exception =>
log.error("Closed Connection object Connection exception!"+ e.getMessage(), e); }}}}Copy the code
- Kafka producer: KafkaProducerTest
object KafkaProducerTest {
def main(args: Array[String) :Unit = {
val props : Properties = new Properties()
props.put("bootstrap.servers".ConfigConstants.kafkaBrokers)
props.put("batch.size".ConfigConstants.batchSize.asInstanceOf[Integer])
props.put("linger.ms".ConfigConstants.lingerMs.asInstanceOf[Integer])
props.put("buffer.memory".ConfigConstants.bufferMemory.asInstanceOf[Integer])
props.put("key.serializer".ConfigConstants.kafkaKeySer)
props.put("value.serializer".ConfigConstants.kafkaValueSer)
val producer : Producer[String.String] = new KafkaProducer[String.String](props)
val startTime : Long = System.currentTimeMillis()
for ( i <- 1 to 100) {
producer.send(new ProducerRecord[String.String] (ConfigConstants.kafkaTopics, "Spark".Integer.toString(i)))
}
println("Consumption of time:" + (System.currentTimeMillis() - startTime))
producer.close()
}
}
Copy the code
- Read and save Offset:
This object reads and writes offsets from external devices, including MySQL and Redis
object OffsetReadAndSave {
** @param groupid * @param topic * @return */
def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition.Long] = {
val conn = JDBCConnPool.getConnection()
val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
val ppst = conn.prepareStatement(selectSql)
ppst.setString(1, groupid)
ppst.setString(2, topic)
val result: ResultSet = ppst.executeQuery()
// Topic partition offset
val topicPartitionOffset = mutable.Map[TopicPartition.Long] ()while (result.next()) {
val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))
topicPartitionOffset += (topicPartition -> result.getLong("offset"))}JDBCConnPool.closeConnection(ppst, conn)
topicPartitionOffset
}
/** * get offset from Redis ** @param groupid * @param topic * @return */
def getOffsetFromRedis(groupid: String, topic: String) :Map[TopicPartition.Long] = {
val jedis: Jedis = JedisConnPool.getConnection()
var offsets = mutable.Map[TopicPartition.Long] ()val key = s"${topic}_${groupid}"
val fields : java.util.Map[String.String] = jedis.hgetAll(key)
for (partition <- JavaConversions.mapAsScalaMap(fields)) {
offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
}
offsets.toMap
}
/** * write the offset to MySQL ** @param groupid consumer groupid * @param offsetRange message offsetRange */
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val conn = JDBCConnPool.getConnection()
val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(? ,? ,? ,?) "
val ppst = conn.prepareStatement(insertSql)
for (offset <- offsetRange) {
ppst.setString(1, offset.topic)
ppst.setInt(2, offset.partition)
ppst.setString(3, groupid)
ppst.setLong(4, offset.untilOffset)
ppst.executeUpdate()
}
JDBCConnPool.closeConnection(ppst, conn)
}
/** * Save the offset to Redis * @param groupid * @param offsetRange */
def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
val jedis :Jedis = JedisConnPool.getConnection()
for(offsetRange<-offsetRange){
val topic=offsetRange.topic
val partition=offsetRange.partition
val offset=offsetRange.untilOffset
// Key is topic_groupid,field is partition, and value is offset
jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
}
}
}
Copy the code
- Business processing class
This object is the business processing logic that consumes Kafka data and then manually saves the offset to MySQL. When the program is started, it determines whether there is an Offset in the external storage device. If it is started for the first time, it is consumed from the original consumption point, and if Offset exists, it is consumed from the current Offset.
Observation: Data is consumed from scratch when it is first started, stop the program manually, and then start again, and you will find that data is consumed from the offset currently committed.
object ManualCommitOffset {
def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer",ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
//Must open the checkpoint, or complains SSC. Checkpoint (ConfigConstants. CheckpointDir) SSC. SparkContext. SetLogLevel (" OFF ")//Create a direct kafka stream val topicSet using broker and topic= topics.split(" ").toSet
//The kafka connection parameter val kafkaParams= Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
//Read val offsetMap, the partition offset of the consumer group corresponding to this topic, from MySQL= OffsetReadAndSave.getOffsetMap(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
//If an offset already exists in MySQL, you should start consuming if (offsetmap.size) at that offset> 0Println (" there is an offset, consume from that offset!!" ) inputDStream= KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
//If no offset exists in MySQL, inputDStream is consumed from the earliest= KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
//Checkpoint interval, must be batchInterval integer times inputDStream. Checkpoint (ConfigConstants. CheckpointInterval)//Save the batch ofoffset
var offsetRanges = Array[OffsetRange]()
//Gets the message offset of the current DS val transformDS= inputDStream.transform { rdd =>
//To obtainoffset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/** * status update function * @param newValues: new value * @param stateValue: status value * @return */
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
var oldvalue = stateValue.getOrElse(0) //Get status value//Iterate over the current data and update the statusfor (newValue <- newValues) {
oldvalue += newValue
}
//Return the latest state Option(oldValue)}//Business logic processing//This example counts the number of message keys to see if data transformds.map (Meg) is consumed from an offset that has been committed=> ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
//Print the offset and data information and observe the output. Transformds. foreachRDD {(RDD,time) =>
//RDD. Foreach {record=>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
//Print the consumption offset informationfor (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
//To save the offset to the MySQL OffsetReadAndSave. SaveOffsetRanges (groupId, offsetRanges)} SSC. Start (SSC). The awaitTermination ()}}Copy the code
How to use Redis to manage Kafka offsets
- Redis connection class
object JedisConnPool {
val config = new JedisPoolConfig
// Maximum number of connections
config.setMaxTotal(60)
// Maximum number of idle connections
config.setMaxIdle(10)
config.setTestOnBorrow(true)
// Server IP address
val redisAddress :String = ConfigConstants.redisAddress.toString
/ / the port number
val redisPort:Int = ConfigConstants.redisPort.toInt
// Access password
val redisAuth :String = ConfigConstants.redisAuth.toString
// The maximum time to wait for available connections
val redisTimeout:Int = ConfigConstants.redisTimeout.toInt
val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)
def getConnection() :Jedis = {
pool.getResource
}
}
Copy the code
- Business logic processing
This object is basically similar to the above, but uses Redis to store Offset. The data type stored in Redis is Hash, and the basic format is: [key field value] -> [topic_groupid partition offset], that is, key is topic_groupid,field is partition, and value is offset.
object ManualCommitOffsetToRedis {
def main(args: Array[String) :Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer".ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// Checkpoint must be enabled, otherwise an error will be reported
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
Create a direct kafka stream using broker and topic
val topicSet = topics.split("").toSet
// Kafka connection parameters
val kafkaParams = Map[String.Object] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG- > (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// Read the partitioning offset of the consumer group corresponding to this topic from Redis
val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String.String]] = null
// If an offset already exists in Redis, consumption should start at that offset
if (offsetMap.size > 0) {
println("There is an offset, consume from that offset!!")
inputDStream = KafkaUtils.createDirectStream[String.String](
ssc,
LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String](topicSet, kafkaParams, offsetMap))
} else {
// If there is no offset in Redis, start consuming from the earliest
inputDStream = KafkaUtils.createDirectStream[String.String](
ssc,
LocationStrategies.PreferConsistent.ConsumerStrategies.Subscribe[String.String](topicSet, kafkaParams))
}
// The checkpoint interval must be an integer multiple of batchInterval
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// Save the batch offset
var offsetRanges = Array[OffsetRange] ()// Get the message offset of the current DS
val transformDS = inputDStream.transform { rdd =>
/ / for offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/** * status update function ** @param newValues: new value * @param stateValue: status value * @return */
def updateFunc(newValues: Seq[Int], stateValue: Option[Int) :Option[Int] = {
var oldvalue = stateValue.getOrElse(0) // Get the status value
// Iterate over the current data and update the status
for (newValue <- newValues) {
oldvalue += newValue
}
// Returns the latest status
Option(oldvalue)
}
// Business logic processing
// This example counts the number of message keys to see if data is being consumed from committed offsets
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// Print the offset and data information and observe the output result
transformDS.foreachRDD { (rdd, time) =>
// Print the RDD data iteratively
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// Prints the consumption offset information
for (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}// Save the offset to Redis
OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
Copy the code
conclusion
This article explained how to use an external storage device to store consumption points in Kafka, and detailed code examples showed how to use MySQL and Redis to manage consumption points. Of course, there are many external storage devices. Users can also use other storage devices to manage offsets, such as Zookeeper and HBase. The basic processing roadmap is similar.
Big data Technology and data warehouse