Libraries to rely on
Github.com/Shopify/sarama github.com/bsm/sarama-cluster / / / / kafka main library * kafka consumer groupsCopy the code
producers
package producer
import (
"fmt"
"github.com/HappyTeemo7569/teemoKit/tlog"
"github.com/Shopify/sarama"
"kafkaDemo/define"
)
var (
ProducerId = 1
)
type Producer struct {
Producer sarama.SyncProducer
Topic string / / theme
ProducerID int // Producer Id
MessageId int
}
func (p *Producer) InitProducer(a) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // After sending data, both the leader and follow need to confirm it
config.Producer.Partitioner = sarama.NewRandomPartitioner // Select a new partition
config.Producer.Return.Successes = true // Successfully delivered messages will be returned in success Channel
/ / connected kafka
client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
iferr ! =nil {
tlog.Error("producer closed, err:", err)
return
}
p.Producer = client
p.Topic = define.TOPIC
p.ProducerID = ProducerId
p.MessageId = 1
ProducerId++
}
func (p *Producer) SendMessage(a) {
// Construct a message
msg := &sarama.ProducerMessage{}
msg.Topic = p.Topic
txt := fmt.Sprintf("ProducerID:%d this is a test log %d",
p.ProducerID, p.MessageId)
msg.Value = sarama.StringEncoder(txt)
// Send a message
pid, offset, err := p.Producer.SendMessage(msg)
//_, _, err := client.SendMessage(msg)
iferr ! =nil {
fmt.Println("send msg failed, err:", err)
return
}
tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
p.ProducerID, pid, offset, txt))
p.MessageId++
}
func (p *Producer) Close(a) {
p.Producer.Close()
}
Copy the code
consumers
package consumer
import (
"github.com/HappyTeemo7569/teemoKit/tlog"
"github.com/Shopify/sarama"
"kafkaDemo/define"
)
type Consumer struct {
Consumer sarama.Consumer
Topic string
ConsumerId int // Consumer Id
}
func (c *Consumer) InitConsumer(a) error {
consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
iferr ! =nil {
return err
}
c.Consumer = consumer
c.Topic = define.TOPIC
c.ConsumerId = ConsumerId
ConsumerId++
return nil
}
/ / specified partition
// Offset can be specified. Pass -1 to get the latest offest
func (c *Consumer) GetMessage(partitionId int32, offset int64) {
if offset == - 1 {
offset = sarama.OffsetNewest
}
pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
iferr ! =nil {
tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
//That topic/partition is already being consumed
return
}
// Consume information asynchronously from each partition
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)
}
// Iterate over all partitions
func (c *Consumer) GetMessageToAll(offset int64) {
partitionList, err := c.Consumer.Partitions(c.Topic) // Select all partitions according to topic
iferr ! =nil {
tlog.Error("fail to get list of partition:err%v", err)
return
}
tlog.Info("All of the partition.", partitionList)
for partition := range partitionList { // Iterate over all partitions
c.GetMessage(int32(partition), offset)
}
}
Copy the code
The main function
func main(a) {
tlog.Info("Start")
go producer.Put()
go consumer.Get()
for {
time.Sleep(time.Hour * 60)}}func Put(a) {
producer := new(Producer)
producer.InitProducer()
go func(a) {
for {
producer.SendMessage()
time.Sleep(1 * time.Second)
}
}()
}
func Get(a) {
offest := int64(0)
consumer := new(Consumer)
err := consumer.InitConsumer()
iferr ! =nil {
tlog.Error("fail to init consumer, err:%v", err)
return
}
consumer.GetMessageToAll(offest)
}
Copy the code
Specific source can view: https://github.com/HappyTeemo7569/kafkaDemo