“This is the 22nd day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.
The Kafka Go client is not officially available, but there are two very popular libraries on Github
- More stars, online cases are also more github.com/Shopify/sar…
- Confluent library github.com/confluentin…
Sarama is used here, because there are many stars and many cases, which makes it easy to get started quickly
Pay attention to
If kafka version in 2.2 the following, need to go. The mod will sarama version inside to github.com/Shopify/sarama v1.24.1
This is because Sarama only comes with the latest 2 releases plus a 2 month compatibility guarantee, so using the lower version kafka is a pitfall
If you use a non-cluster producer, you need to create your own topic. If you use a cluster producer, the cluster will be created automatically
example
package main
import (
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"time"
)
var (
Consumer *cluster.Consumer
producer sarama.SyncProducer
brokers = []string{"ip1:9092"."ip2:9092"."ip3:9092"}
topic = "testGo"
groupId = "testGo_test1"
)
func initProducer() {
var err error
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 3
config.Producer.Return.Successes = true
brokers := brokers
producer, err = sarama.NewSyncProducer(brokers,config)
iferr ! = nil { fmt.Printf("Producer initialization failed -> %v \n", err)
panic(err)
}
fmt.Println("Producer initialization succeeded.")
}
func initConsumer() {
var err error
config := cluster.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = sarama.OffsetNewest
Consumer, err = cluster.NewConsumer(brokers,groupId,[]string{topic},config)
iferr ! = nil { fmt.Printf("Consumer initialization failed -> %v \n", err)
panic(err.Error())
}
if Consumer == nil {
panic(fmt.Sprintf(Kafka info -> {brokers:%v, topic: %v, group: %v}", brokers, topic, groupId))
}
fmt.Printf("Consumer -> %v, topic -> %v,", Consumer, topic)
}
func main() {
initProducer()
initConsumer()
// Production message
for i := 1; i <100; i ++ {
pid, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(i),
Value: sarama.ByteEncoder("this is test message."})),iferr ! = nil { fmt.Println("Failed to send message, err:", err)
return
}
fmt.Printf("offset: %v\n", offset)
}
time.Sleep(2 * time.Second)
// Consume messages
for {
select {
case msg, ok: =<-Consumer.Messages(): if ok { fmt.Printf("kafka msg: %s \n", msg.Value) } } } }Copy the code
The result is as follows
The demo process is as follows
- Introduce standalone sarama library and cluster Sarama library
- Defining connection variables
- Instantiate a producer using the single Sarama library
- Instantiate a consumer using the clustered Sarama library
- Loop 100 times to send 100 messages
- Use sarama’s own production message constructor to set the message content
- Use for to keep the process listening for messages from Kafka